diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 3133e1c3b0..e7734161f4 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -257,7 +257,7 @@ interface CordaRPCOps : RPCOps { * complete with an exception if it is unable to. */ @RPCReturnsObservables - fun waitUntilRegisteredWithNetworkMap(): CordaFuture + fun waitUntilNetworkReady(): CordaFuture // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of // the node's state locally and query that directly. @@ -299,6 +299,11 @@ interface CordaRPCOps : RPCOps { * @return the node info if available. */ fun nodeIdentityFromParty(party: AbstractParty): NodeInfo? + + /** + * Clear all network map data from local node cache. + */ + fun clearNetworkMapCache() } inline fun CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), diff --git a/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt b/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt index d7f30a4f50..c802831d0f 100644 --- a/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt +++ b/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt @@ -1,12 +1,16 @@ package net.corda.core.node +import net.corda.core.crypto.locationOrNull import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.NodeInfoSchemaV1 import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NonEmptySet +import net.corda.core.serialization.serialize /** * Information for an advertised service including the service specific identity information. @@ -21,11 +25,13 @@ data class ServiceEntry(val info: ServiceInfo, val identity: PartyAndCertificate // TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures. @CordaSerializable data class NodeInfo(val addresses: List, - val legalIdentityAndCert: PartyAndCertificate, //TODO This field will be removed in future PR which gets rid of services. - val legalIdentitiesAndCerts: NonEmptySet, + // TODO After removing of services these two fields will be merged together and made NonEmptySet. + val legalIdentityAndCert: PartyAndCertificate, + val legalIdentitiesAndCerts: Set, val platformVersion: Int, val advertisedServices: List = emptyList(), - val worldMapLocation: WorldMapLocation? = null) { + val serial: Long +) { init { require(advertisedServices.none { it.identity == legalIdentityAndCert }) { "Service identities must be different from node legal identity" @@ -37,4 +43,12 @@ data class NodeInfo(val addresses: List, fun serviceIdentities(type: ServiceType): List { return advertisedServices.mapNotNull { if (it.info.type.isSubTypeOf(type)) it.identity.party else null } } + + /** + * Uses node's owner X500 name to infer the node's location. Used in Explorer in map view. + */ + fun getWorldMapLocation(): WorldMapLocation? { + val nodeOwnerLocation = legalIdentity.name.locationOrNull + return nodeOwnerLocation?.let { CityDatabase[it] } + } } diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 33fbd1f10e..3f854a44fc 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -8,6 +8,7 @@ import net.corda.core.internal.randomOrNull import net.corda.core.messaging.DataFeed import net.corda.core.node.NodeInfo import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.NetworkHostAndPort import org.bouncycastle.asn1.x500.X500Name import rx.Observable import java.security.PublicKey @@ -44,7 +45,7 @@ interface NetworkMapCache { /** Tracks changes to the network map cache */ val changed: Observable /** Future to track completion of the NetworkMapService registration. */ - val mapServiceRegistered: CordaFuture + val nodeReady: CordaFuture /** * Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the @@ -76,7 +77,10 @@ interface NetworkMapCache { fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? /** Look up the node info for a legal name. */ - fun getNodeByLegalName(principal: X500Name): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == principal } + fun getNodeByLegalName(principal: X500Name): NodeInfo? + + /** Look up the node info for a host and port. */ + fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? /** * In general, nodes can advertise multiple identities: a legal identity, and separate identities for each of @@ -144,4 +148,9 @@ interface NetworkMapCache { "Your options are: ${notaryNodes.map { "\"${it.notaryIdentity.name}\"" }.joinToString()}.") return notary.advertisedServices.any { it.info.type.isValidatingNotary() } } + + /** + * Clear all network map data from local node cache. + */ + fun clearNetworkMapCache() } diff --git a/core/src/main/kotlin/net/corda/core/schemas/NodeInfoSchema.kt b/core/src/main/kotlin/net/corda/core/schemas/NodeInfoSchema.kt new file mode 100644 index 0000000000..f0d21aeaf3 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/NodeInfoSchema.kt @@ -0,0 +1,142 @@ +package net.corda.core.schemas + +import net.corda.core.crypto.toBase58String +import net.corda.core.identity.PartyAndCertificate +import net.corda.core.node.NodeInfo +import net.corda.core.node.ServiceEntry +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort +import java.io.Serializable +import java.security.cert.CertPath +import javax.persistence.CascadeType +import javax.persistence.Column +import javax.persistence.ElementCollection +import javax.persistence.Embeddable +import javax.persistence.EmbeddedId +import javax.persistence.Entity +import javax.persistence.GeneratedValue +import javax.persistence.Id +import javax.persistence.JoinColumn +import javax.persistence.JoinTable +import javax.persistence.Lob +import javax.persistence.ManyToMany +import javax.persistence.OneToMany +import javax.persistence.Table + +object NodeInfoSchema + +object NodeInfoSchemaV1 : MappedSchema( + schemaFamily = NodeInfoSchema.javaClass, + version = 1, + mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java) +) { + @Entity + @Table(name = "node_infos") + class PersistentNodeInfo( + @Id + @GeneratedValue + @Column(name = "node_info_id") + var id: Int, + + @Column(name = "addresses") + @OneToMany(cascade = arrayOf(CascadeType.ALL), orphanRemoval = true) + val addresses: List, + + @Column(name = "legal_identities_certs") + @ManyToMany(cascade = arrayOf(CascadeType.ALL)) + @JoinTable(name = "link_nodeinfo_party", + joinColumns = arrayOf(JoinColumn(name="node_info_id")), + inverseJoinColumns = arrayOf(JoinColumn(name="party_name"))) + val legalIdentitiesAndCerts: Set, + + @Column(name = "platform_version") + val platformVersion: Int, + + @Column(name = "advertised_services") + @ElementCollection + var advertisedServices: List = emptyList(), + + /** + * serial is an increasing value which represents the version of [NodeInfo]. + * Not expected to be sequential, but later versions of the registration must have higher values + * Similar to the serial number on DNS records. + */ + @Column(name = "serial") + val serial: Long + ) { + fun toNodeInfo(): NodeInfo { + return NodeInfo( + this.addresses.map { it.toHostAndPort() }, + this.legalIdentitiesAndCerts.filter { it.isMain }.single().toLegalIdentityAndCert(), // TODO Workaround, it will be changed after PR with services removal. + this.legalIdentitiesAndCerts.filter { !it.isMain }.map { it.toLegalIdentityAndCert() }.toSet(), + this.platformVersion, + this.advertisedServices.map { + it.serviceEntry?.deserialize() ?: throw IllegalStateException("Service entry shouldn't be null") + }, + this.serial + ) + } + } + + @Embeddable + data class PKHostAndPort( + val host: String? = null, + val port: Int? = null + ) : Serializable + + @Entity + data class DBHostAndPort( + @EmbeddedId + private val pk: PKHostAndPort + ) { + companion object { + fun fromHostAndPort(hostAndPort: NetworkHostAndPort) = DBHostAndPort( + PKHostAndPort(hostAndPort.host, hostAndPort.port) + ) + } + fun toHostAndPort(): NetworkHostAndPort { + return NetworkHostAndPort(this.pk.host!!, this.pk.port!!) + } + } + + @Embeddable // TODO To be removed with services. + data class DBServiceEntry( + @Column(length = 65535) + val serviceEntry: ByteArray? = null + ) + + /** + * PartyAndCertificate entity (to be replaced by referencing final Identity Schema). + */ + @Entity + @Table(name = "node_info_party_cert") + data class DBPartyAndCertificate( + @Id + @Column(name = "owning_key", length = 65535, nullable = false) + val owningKey: String, + + //@Id // TODO Do we assume that names are unique? Note: We can't have it as Id, because our toString on X500 is inconsistent. + @Column(name = "party_name", nullable = false) + val name: String, + + @Column(name = "certificate") + @Lob + val certificate: ByteArray, + + @Column(name = "certificate_path") + @Lob + val certPath: ByteArray, + + val isMain: Boolean, + + @ManyToMany(mappedBy = "legalIdentitiesAndCerts", cascade = arrayOf(CascadeType.ALL)) // ManyToMany because of distributed services. + private val persistentNodeInfos: Set = emptySet() + ) { + constructor(partyAndCert: PartyAndCertificate, isMain: Boolean = false) + : this(partyAndCert.party.owningKey.toBase58String(), partyAndCert.party.name.toString(), partyAndCert.certificate.serialize().bytes, partyAndCert.certPath.serialize().bytes, isMain) + fun toLegalIdentityAndCert(): PartyAndCertificate { + return PartyAndCertificate(certPath.deserialize()) + } + } +} diff --git a/docs/source/api-persistence.rst b/docs/source/api-persistence.rst index 8844f80b4a..fbfb0c5d98 100644 --- a/docs/source/api-persistence.rst +++ b/docs/source/api-persistence.rst @@ -17,9 +17,9 @@ The ORM mapping is specified using the `Java Persistence API (dummyTopic, sessionId).getOrThrow(5.seconds) - assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2) assertThat(response).isEqualTo(responseMessage) } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index 0e32802348..788d76ae00 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -67,7 +67,7 @@ class P2PSecurityTest : NodeBasedTest() { private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): CordaFuture { val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public) - val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1) + val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1, serial = 1) val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX) val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress) return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 146f391689..c21f7bd800 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -12,6 +12,7 @@ import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.* +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps @@ -39,7 +40,7 @@ import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.sendRequest -import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService.RegistrationRequest import net.corda.node.services.network.NetworkMapService.RegistrationResponse @@ -138,10 +139,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, var isPreviousCheckpointsPresent = false private set - protected val _networkMapRegistrationFuture = openFuture() - /** Completes once the node has successfully registered with the network map service */ - val networkMapRegistrationFuture: CordaFuture - get() = _networkMapRegistrationFuture + protected val _nodeReadyFuture = openFuture() + /** Completes once the node has successfully registered with the network map service + * or has loaded network map data from local database */ + val nodeReadyFuture: CordaFuture + get() = _nodeReadyFuture /** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */ open val pluginRegistries: List by lazy { @@ -155,10 +157,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, /** The implementation of the [CordaRPCOps] interface used by this node. */ open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM. - open fun findMyLocation(): WorldMapLocation? { - return configuration.myLegalName.locationOrNull?.let { CityDatabase[it] } - } - open fun start() { require(!started) { "Node has already been started" } @@ -208,7 +206,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } runOnStop += network::stop - _networkMapRegistrationFuture.captureLater(registerWithNetworkMapIfConfigured()) + } + // If we successfully loaded network data from database, we set this future to Unit. + _nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured()) + database.transaction { smm.start() // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } @@ -483,9 +484,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, private fun makeInfo(legalIdentity: PartyAndCertificate): NodeInfo { val advertisedServiceEntries = makeServiceEntries() - val allIdentities = (advertisedServiceEntries.map { it.identity } + legalIdentity).toNonEmptySet() + val allIdentities = advertisedServiceEntries.map { it.identity }.toSet() // TODO Add node's legalIdentity (after services removal). val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet. - return NodeInfo(addresses, legalIdentity, allIdentities, platformVersion, advertisedServiceEntries, findMyLocation()) + return NodeInfo(addresses, legalIdentity, allIdentities, platformVersion, advertisedServiceEntries, platformClock.instant().toEpochMilli()) } /** @@ -580,7 +581,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, services.networkMapCache.runWithoutMapService() noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache } else { - registerWithNetworkMap() + val netMapRegistration = registerWithNetworkMap() + // We may want to start node immediately with database data and not wait for network map registration (but send it either way). + // So we are ready to go. + if (services.networkMapCache.loadDBSuccess) { + log.info("Node successfully loaded network map data from the database.") + doneFuture(Unit) + } else { + netMapRegistration + } } } @@ -606,7 +615,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, // Register this node against the network val instant = platformClock.instant() val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD - val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires) + val reg = NodeRegistration(info, info.serial, ADD, expires) val request = RegistrationRequest(reg.toWire(services.keyManagementService, info.legalIdentityAndCert.owningKey), network.myAddress) return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress) } @@ -616,9 +625,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, /** This is overriden by the mock node implementation to enable operation without any network map service */ protected open fun noNetworkMapConfigured(): CordaFuture { - // TODO: There should be a consistent approach to configuration error exceptions. - throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " + - "has any other map node been configured.") + if (services.networkMapCache.loadDBSuccess) { + return doneFuture(Unit) + } else { + // TODO: There should be a consistent approach to configuration error exceptions. + throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " + + "has any other map node been configured.") + } } protected open fun makeKeyManagementService(identityService: IdentityService): KeyManagementService { @@ -754,7 +767,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override val monitoringService = MonitoringService(MetricRegistry()) override val validatedTransactions = makeTransactionStorage() override val transactionVerifierService by lazy { makeTransactionVerifierService() } - override val networkMapCache by lazy { InMemoryNetworkMapCache(this) } + override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) } + override val networkMapCache by lazy { PersistentNetworkMapCache(this) } override val vaultService by lazy { NodeVaultService(this) } override val vaultQueryService by lazy { HibernateVaultQueryImpl(database.hibernateConfig, vaultService) @@ -776,7 +790,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override val networkService: MessagingService get() = network override val clock: Clock get() = platformClock override val myInfo: NodeInfo get() = info - override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) } override val database: CordaPersistence get() = this@AbstractNode.database override val configuration: NodeConfiguration get() = this@AbstractNode.configuration diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 79ca9d2f22..e87359f0a6 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -173,7 +173,7 @@ class CordaRPCOpsImpl( override fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class>) = services.vaultService.authoriseContractUpgrade(state, upgradedContractClass) override fun deauthoriseContractUpgrade(state: StateAndRef<*>) = services.vaultService.deauthoriseContractUpgrade(state) override fun currentNodeTime(): Instant = Instant.now(services.clock) - override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered + override fun waitUntilNetworkReady() = services.networkMapCache.nodeReady override fun partyFromAnonymous(party: AbstractParty): Party? = services.identityService.partyFromAnonymous(party) override fun partyFromKey(key: PublicKey) = services.identityService.partyFromKey(key) override fun partyFromX500Name(x500Name: X500Name) = services.identityService.partyFromX500Name(x500Name) @@ -182,6 +182,10 @@ class CordaRPCOpsImpl( override fun registeredFlows(): List = services.rpcFlows.map { it.name }.sorted() + override fun clearNetworkMapCache() { + services.networkMapCache.clearNetworkMapCache() + } + companion object { private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.flowInitiator, flowLogic.track()) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index b30478e237..27016f1617 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -155,7 +155,7 @@ open class Node(override val configuration: FullNodeConfiguration, myIdentityOrNullIfNetworkMapService, serverThread, database, - networkMapRegistrationFuture, + nodeReadyFuture, services.monitoringService, advertisedAddress) } @@ -210,15 +210,17 @@ open class Node(override val configuration: FullNodeConfiguration, log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" } val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { - initialConnectAttempts = 5 - retryInterval = 5.seconds.toMillis() + initialConnectAttempts = 2 // TODO Public host discovery needs rewriting, as we may start nodes without network map, and we don't want to wait that long on startup. + retryInterval = 2.seconds.toMillis() retryIntervalMultiplier = 1.5 maxRetryInterval = 3.minutes.toMillis() } val clientFactory = try { locator.createSessionFactory() } catch (e: ActiveMQNotConnectedException) { - throw IOException("Unable to connect to the Network Map Service at $serverAddress for IP address discovery", e) + log.warn("Unable to connect to the Network Map Service at $serverAddress for IP address discovery. " + + "Using the provided \"${configuration.p2pAddress.host}\" as the advertised address.") + return null } val session = clientFactory.createSession(PEER_USER, PEER_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) @@ -306,7 +308,7 @@ open class Node(override val configuration: FullNodeConfiguration, } super.start() - networkMapRegistrationFuture.thenMatch({ + nodeReadyFuture.thenMatch({ serverThread.execute { // Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia: // diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 5641cb8953..2d55019454 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -102,7 +102,7 @@ open class NodeStartup(val args: Array) { node.start() printPluginsAndServices(node) - node.networkMapRegistrationFuture.thenMatch({ + node.nodeReadyFuture.thenMatch({ val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0 // TODO: Replace this with a standard function to get an unambiguous rendering of the X.500 name. val name = node.info.legalIdentity.name.orgName ?: node.info.legalIdentity.name.commonName diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 69e010892b..4d2bbfccbf 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -53,6 +53,9 @@ interface NetworkMapCacheInternal : NetworkMapCache { /** For testing where the network map cache is manipulated marks the service as immediately ready. */ @VisibleForTesting fun runWithoutMapService() + + /** Indicates if loading network map data from database was successful. */ + val loadDBSuccess: Boolean } @CordaSerializable diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 829c36bfbf..49ff18c942 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -338,6 +338,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. */ private fun updateBridgesOnNetworkChange(change: MapChange) { + log.debug { "Updating bridges on network map change: ${change.node}" } fun gatherAddresses(node: NodeInfo): Sequence { val peerAddress = getArtemisPeerAddress(node) val addresses = mutableListOf(peerAddress) diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt deleted file mode 100644 index 774e93f499..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ /dev/null @@ -1,185 +0,0 @@ -package net.corda.node.services.network - -import net.corda.core.concurrent.CordaFuture -import net.corda.core.identity.AbstractParty -import net.corda.core.identity.Party -import net.corda.core.internal.VisibleForTesting -import net.corda.core.internal.bufferUntilSubscribed -import net.corda.core.internal.concurrent.map -import net.corda.core.internal.concurrent.openFuture -import net.corda.core.messaging.DataFeed -import net.corda.core.messaging.SingleMessageRecipient -import net.corda.core.node.NodeInfo -import net.corda.core.node.ServiceHub -import net.corda.core.node.services.NetworkMapCache.MapChange -import net.corda.core.node.services.PartyInfo -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize -import net.corda.core.utilities.loggerFor -import net.corda.node.services.api.NetworkCacheError -import net.corda.node.services.api.NetworkMapCacheInternal -import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.messaging.createMessage -import net.corda.node.services.messaging.sendRequest -import net.corda.node.services.network.NetworkMapService.FetchMapResponse -import net.corda.node.services.network.NetworkMapService.SubscribeResponse -import net.corda.node.utilities.AddOrRemove -import net.corda.node.utilities.bufferUntilDatabaseCommit -import net.corda.node.utilities.wrapWithDatabaseTransaction -import rx.Observable -import rx.subjects.PublishSubject -import java.security.PublicKey -import java.security.SignatureException -import java.util.* -import javax.annotation.concurrent.ThreadSafe - -/** - * Extremely simple in-memory cache of the network map. - * - * @param serviceHub an optional service hub from which we'll take the identity service. We take a service hub rather - * than the identity service directly, as this avoids problems with service start sequence (network map cache - * and identity services depend on each other). Should always be provided except for unit test cases. - */ -@ThreadSafe -open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : SingletonSerializeAsToken(), NetworkMapCacheInternal { - companion object { - val logger = loggerFor() - } - - override val partyNodes: List get() = registeredNodes.map { it.value } - override val networkMapNodes: List get() = getNodesWithService(NetworkMapService.type) - private val _changed = PublishSubject.create() - // We use assignment here so that multiple subscribers share the same wrapped Observable. - override val changed: Observable = _changed.wrapWithDatabaseTransaction() - private val changePublisher: rx.Observer get() = _changed.bufferUntilDatabaseCommit() - - private val _registrationFuture = openFuture() - override val mapServiceRegistered: CordaFuture get() = _registrationFuture - - private var registeredForPush = false - protected var registeredNodes: MutableMap = Collections.synchronizedMap(HashMap()) - - override fun getPartyInfo(party: Party): PartyInfo? { - val node = registeredNodes[party.owningKey] - if (node != null) { - return PartyInfo.Node(node) - } - for ((_, value) in registeredNodes) { - for (service in value.advertisedServices) { - if (service.identity.party == party) { - return PartyInfo.Service(service) - } - } - } - return null - } - - override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = registeredNodes[identityKey] - override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? { - val wellKnownParty = if (serviceHub != null) { - serviceHub.identityService.partyFromAnonymous(party) - } else { - party - } - - return wellKnownParty?.let { - getNodeByLegalIdentityKey(it.owningKey) - } - } - - override fun track(): DataFeed, MapChange> { - synchronized(_changed) { - return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) - } - } - - override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean, - ifChangedSinceVer: Int?): CordaFuture { - if (subscribe && !registeredForPush) { - // Add handler to the network, for updates received from the remote network map service. - network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ -> - try { - val req = message.data.deserialize() - val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC, - data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes) - network.send(ackMessage, req.replyTo) - processUpdatePush(req) - } catch(e: NodeMapError) { - logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") - } catch(e: Exception) { - logger.error("Exception processing update from network map service", e) - } - } - registeredForPush = true - } - - // Fetch the network map and register for updates at the same time - val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress) - val future = network.sendRequest(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) -> - // We may not receive any nodes back, if the map hasn't changed since the version specified - nodes?.forEach { processRegistration(it) } - Unit - } - _registrationFuture.captureLater(future.map { null }) - - return future - } - - override fun addNode(node: NodeInfo) { - synchronized(_changed) { - val previousNode = registeredNodes.put(node.legalIdentity.owningKey, node) - if (previousNode == null) { - changePublisher.onNext(MapChange.Added(node)) - } else if (previousNode != node) { - changePublisher.onNext(MapChange.Modified(node, previousNode)) - } - } - } - - override fun removeNode(node: NodeInfo) { - synchronized(_changed) { - registeredNodes.remove(node.legalIdentity.owningKey) - changePublisher.onNext(MapChange.Removed(node)) - } - } - - /** - * Unsubscribes from updates from the given map service. - * @param service the network map service to listen to updates from. - */ - override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture { - // Fetch the network map and register for updates at the same time - val req = NetworkMapService.SubscribeRequest(false, network.myAddress) - // `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo. - val address = network.getAddressOfParty(PartyInfo.Node(service)) - val future = network.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map { - if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed() - } - _registrationFuture.captureLater(future.map { null }) - return future - } - - fun processUpdatePush(req: NetworkMapService.Update) { - try { - val reg = req.wireReg.verified() - processRegistration(reg) - } catch (e: SignatureException) { - throw NodeMapError.InvalidSignature() - } - } - - private fun processRegistration(reg: NodeRegistration) { - // TODO: Implement filtering by sequence number, so we only accept changes that are - // more recent than the latest change we've processed. - when (reg.type) { - AddOrRemove.ADD -> addNode(reg.node) - AddOrRemove.REMOVE -> removeNode(reg.node) - } - } - - @VisibleForTesting - override fun runWithoutMapService() { - _registrationFuture.set(null) - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt new file mode 100644 index 0000000000..74a437817b --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -0,0 +1,339 @@ +package net.corda.node.services.network + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.crypto.parsePublicKeyBase58 +import net.corda.core.crypto.toBase58String +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.Party +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.concurrent.map +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.messaging.DataFeed +import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.NetworkMapCache.MapChange +import net.corda.core.node.services.PartyInfo +import net.corda.core.schemas.NodeInfoSchemaV1 +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.loggerFor +import net.corda.node.services.api.NetworkCacheError +import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.messaging.createMessage +import net.corda.node.services.messaging.sendRequest +import net.corda.node.services.network.NetworkMapService.FetchMapResponse +import net.corda.node.services.network.NetworkMapService.SubscribeResponse +import net.corda.node.utilities.AddOrRemove +import net.corda.node.utilities.DatabaseTransactionManager +import net.corda.node.utilities.bufferUntilDatabaseCommit +import net.corda.node.utilities.wrapWithDatabaseTransaction +import org.bouncycastle.asn1.x500.X500Name +import org.hibernate.Session +import rx.Observable +import rx.subjects.PublishSubject +import java.security.PublicKey +import java.security.SignatureException +import java.util.* +import javax.annotation.concurrent.ThreadSafe + +/** + * Extremely simple in-memory cache of the network map. + * + * @param serviceHub an optional service hub from which we'll take the identity service. We take a service hub rather + * than the identity service directly, as this avoids problems with service start sequence (network map cache + * and identity services depend on each other). Should always be provided except for unit test cases. + */ +@ThreadSafe +open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) : SingletonSerializeAsToken(), NetworkMapCacheInternal { + companion object { + val logger = loggerFor() + } + + private var registeredForPush = false + // TODO Small explanation, partyNodes and registeredNodes is left in memory as it was before, because it will be removed in + // next PR that gets rid of services. These maps are used only for queries by service. + override val partyNodes: List get() = registeredNodes.map { it.value } + override val networkMapNodes: List get() = getNodesWithService(NetworkMapService.type) + private val _changed = PublishSubject.create() + // We use assignment here so that multiple subscribers share the same wrapped Observable. + override val changed: Observable = _changed.wrapWithDatabaseTransaction() + private val changePublisher: rx.Observer get() = _changed.bufferUntilDatabaseCommit() + + private val _registrationFuture = openFuture() + override val nodeReady: CordaFuture get() = _registrationFuture + protected val registeredNodes: MutableMap = Collections.synchronizedMap(HashMap()) + private var _loadDBSuccess: Boolean = false + override val loadDBSuccess get() = _loadDBSuccess + + init { + serviceHub.database.transaction { loadFromDB() } + } + + override fun getPartyInfo(party: Party): PartyInfo? { + val nodes = serviceHub.database.transaction { queryByIdentityKey(party.owningKey) } + if (nodes.size == 1 && nodes[0].legalIdentity == party) { + return PartyInfo.Node(nodes[0]) + } + for (node in nodes) { + for (service in node.advertisedServices) { + if (service.identity.party == party) { + return PartyInfo.Service(service) + } + } + } + return null + } + + // TODO See comment to queryByLegalName why it's left like that. + override fun getNodeByLegalName(principal: X500Name): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == principal } + //serviceHub!!.database.transaction { queryByLegalName(principal).firstOrNull() } + override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = + serviceHub.database.transaction { queryByIdentityKey(identityKey).firstOrNull() } + override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? { + val wellKnownParty = serviceHub.identityService.partyFromAnonymous(party) + return wellKnownParty?.let { + getNodeByLegalIdentityKey(it.owningKey) + } + } + + override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = serviceHub.database.transaction { queryByAddress(address) } + + override fun track(): DataFeed, MapChange> { + synchronized(_changed) { + return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + } + } + + override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean, + ifChangedSinceVer: Int?): CordaFuture { + if (subscribe && !registeredForPush) { + // Add handler to the network, for updates received from the remote network map service. + network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ -> + try { + val req = message.data.deserialize() + val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC, + data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes) + network.send(ackMessage, req.replyTo) + processUpdatePush(req) + } catch(e: NodeMapError) { + logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") + } catch(e: Exception) { + logger.error("Exception processing update from network map service", e) + } + } + registeredForPush = true + } + + // Fetch the network map and register for updates at the same time + val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress) + val future = network.sendRequest(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) -> + // We may not receive any nodes back, if the map hasn't changed since the version specified + nodes?.forEach { processRegistration(it) } + Unit + } + _registrationFuture.captureLater(future.map { null }) + + return future + } + + override fun addNode(node: NodeInfo) { + synchronized(_changed) { + val previousNode = registeredNodes.put(node.legalIdentity.owningKey, node) + if (previousNode == null) { + serviceHub.database.transaction { + updateInfoDB(node) + changePublisher.onNext(MapChange.Added(node)) + } + } else if (previousNode != node) { + serviceHub.database.transaction { + updateInfoDB(node) + changePublisher.onNext(MapChange.Modified(node, previousNode)) + } + } + } + } + + override fun removeNode(node: NodeInfo) { + synchronized(_changed) { + registeredNodes.remove(node.legalIdentity.owningKey) + serviceHub.database.transaction { + removeInfoDB(node) + changePublisher.onNext(MapChange.Removed(node)) + } + } + } + + /** + * Unsubscribes from updates from the given map service. + * @param service the network map service to listen to updates from. + */ + override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture { + // Fetch the network map and register for updates at the same time + val req = NetworkMapService.SubscribeRequest(false, network.myAddress) + // `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo. + val address = network.getAddressOfParty(PartyInfo.Node(service)) + val future = network.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map { + if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed() + } + _registrationFuture.captureLater(future.map { null }) + return future + } + + fun processUpdatePush(req: NetworkMapService.Update) { + try { + val reg = req.wireReg.verified() + processRegistration(reg) + } catch (e: SignatureException) { + throw NodeMapError.InvalidSignature() + } + } + + private fun processRegistration(reg: NodeRegistration) { + // TODO: Implement filtering by sequence number, so we only accept changes that are + // more recent than the latest change we've processed. + when (reg.type) { + AddOrRemove.ADD -> addNode(reg.node) + AddOrRemove.REMOVE -> removeNode(reg.node) + } + } + + @VisibleForTesting + override fun runWithoutMapService() { + _registrationFuture.set(null) + } + + // Changes related to NetworkMap redesign + // TODO It will be properly merged into network map cache after services removal. + + private inline fun createSession(block: (Session) -> T): T { + return DatabaseTransactionManager.current().session.let { block(it) } + } + + private fun getAllInfos(session: Session): List { + val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java) + criteria.select(criteria.from(NodeInfoSchemaV1.PersistentNodeInfo::class.java)) + return session.createQuery(criteria).resultList + } + + /** + * Load NetworkMap data from the database if present. Node can start without having NetworkMapService configured. + */ + private fun loadFromDB() { + logger.info("Loading network map from database...") + createSession { + val result = getAllInfos(it) + for (nodeInfo in result) { + try { + logger.info("Loaded node info: $nodeInfo") + val publicKey = parsePublicKeyBase58(nodeInfo.legalIdentitiesAndCerts.single { it.isMain }.owningKey) + val node = nodeInfo.toNodeInfo() + registeredNodes.put(publicKey, node) + changePublisher.onNext(MapChange.Added(node)) // Redeploy bridges after reading from DB on startup. + _loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready. + } catch (e: Exception) { + logger.warn("Exception parsing network map from the database.", e) + } + } + if (loadDBSuccess) { + _registrationFuture.set(null) // Useful only if we don't have NetworkMapService configured so StateMachineManager can start. + } + } + } + + private fun updateInfoDB(nodeInfo: NodeInfo) { + // TODO Temporary workaround to force isolated transaction (otherwise it causes race conditions when processing + // network map registration on network map node) + val session = serviceHub.database.entityManagerFactory.withOptions().connection(serviceHub.database.dataSource.connection + .apply { + transactionIsolation = 1 + }).openSession() + + val tx = session.beginTransaction() + // TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo? + val info = findByIdentityKey(session, nodeInfo.legalIdentity.owningKey) + val nodeInfoEntry = generateMappedObject(nodeInfo) + if (info.isNotEmpty()) { + nodeInfoEntry.id = info[0].id + } + session.merge(nodeInfoEntry) + tx.commit() + session.close() + } + + private fun removeInfoDB(nodeInfo: NodeInfo) { + createSession { + val info = findByIdentityKey(it, nodeInfo.legalIdentity.owningKey).single() + it.remove(info) + } + } + + private fun findByIdentityKey(session: Session, identityKey: PublicKey): List { + val query = session.createQuery( + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKey = :owningKey", + NodeInfoSchemaV1.PersistentNodeInfo::class.java) + query.setParameter("owningKey", identityKey.toBase58String()) + return query.resultList + } + + private fun queryByIdentityKey(identityKey: PublicKey): List { + createSession { + val result = findByIdentityKey(it, identityKey) + return result.map { it.toNodeInfo() } + } + } + + // TODO It's useless for now, because toString on X500 names is inconsistent and we have: + // C=ES,L=Madrid,O=Alice Corp,CN=Alice Corp + // CN=Alice Corp,O=Alice Corp,L=Madrid,C=ES + private fun queryByLegalName(name: X500Name): List { + createSession { + val query = it.createQuery( + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", + NodeInfoSchemaV1.PersistentNodeInfo::class.java) + query.setParameter("name", name.toString()) + val result = query.resultList + return result.map { it.toNodeInfo() } + } + } + + private fun queryByAddress(hostAndPort: NetworkHostAndPort): NodeInfo? { + createSession { + val query = it.createQuery( + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.addresses a WHERE a.pk.host = :host AND a.pk.port = :port", + NodeInfoSchemaV1.PersistentNodeInfo::class.java) + query.setParameter("host", hostAndPort.host) + query.setParameter("port", hostAndPort.port) + val result = query.resultList + return if (result.isEmpty()) null + else result.map { it.toNodeInfo() }.singleOrNull() ?: throw IllegalStateException("More than one node with the same host and port") + } + } + + /** Object Relational Mapping support. */ + private fun generateMappedObject(nodeInfo: NodeInfo): NodeInfoSchemaV1.PersistentNodeInfo { + return NodeInfoSchemaV1.PersistentNodeInfo( + id = 0, + addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) }, + legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.map { NodeInfoSchemaV1.DBPartyAndCertificate(it) }.toSet() + // TODO It's workaround to keep the main identity, will be removed in future PR getting rid of services. + + NodeInfoSchemaV1.DBPartyAndCertificate(nodeInfo.legalIdentityAndCert, isMain = true), + platformVersion = nodeInfo.platformVersion, + advertisedServices = nodeInfo.advertisedServices.map { NodeInfoSchemaV1.DBServiceEntry(it.serialize().bytes) }, + serial = nodeInfo.serial + ) + } + + override fun clearNetworkMapCache() { + serviceHub.database.transaction { + createSession { + val result = getAllInfos(it) + for (nodeInfo in result) it.remove(nodeInfo) + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 331ef4461d..0b53876af9 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -5,6 +5,7 @@ import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.LinearState import net.corda.core.schemas.CommonSchemaV1 import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.NodeInfoSchemaV1 import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState import net.corda.core.serialization.SingletonSerializeAsToken @@ -58,6 +59,7 @@ class NodeSchemaService(customSchemas: Set = emptySet()) : SchemaS val requiredSchemas: Map = mapOf(Pair(CommonSchemaV1, SchemaService.SchemaOptions()), Pair(VaultSchemaV1, SchemaService.SchemaOptions()), + Pair(NodeInfoSchemaV1, SchemaService.SchemaOptions()), Pair(NodeServicesV1, SchemaService.SchemaOptions())) override val schemaOptions: Map = requiredSchemas.plus(customSchemas.map { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 4c2e3b0368..a20c6aaca3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -39,6 +39,7 @@ import net.corda.node.utilities.wrapWithDatabaseTransaction import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl import net.corda.nodeapi.internal.serialization.withTokenContext import org.apache.activemq.artemis.utils.ReusableLatch +import org.bouncycastle.asn1.x500.X500Name import org.slf4j.Logger import rx.Observable import rx.subjects.PublishSubject @@ -170,7 +171,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, checkQuasarJavaAgentPresence() restoreFibersFromCheckpoints() listenToLedgerTransactions() - serviceHub.networkMapCache.mapServiceRegistered.then { executor.execute(this::resumeRestoredFibers) } + serviceHub.networkMapCache.nodeReady.then { executor.execute(this::resumeRestoredFibers) } } private fun checkQuasarJavaAgentPresence() { diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index 0280afd686..c8bdfb9287 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -11,12 +11,12 @@ import net.corda.core.node.services.VaultService import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.days -import net.corda.node.services.MockServiceHubInternal import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.vault.NodeVaultService +import net.corda.node.testing.MockServiceHubInternal import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.configureDatabase diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index c095eb86fb..3d1942c01c 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -12,9 +12,10 @@ import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate -import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.node.testing.MockServiceHubInternal import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.configureDatabase @@ -54,8 +55,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() { var messagingClient: NodeMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null - // TODO: We should have a dummy service hub rather than change behaviour in tests - val networkMapCache = InMemoryNetworkMapCache(serviceHub = null) + lateinit var networkMapCache: PersistentNetworkMapCache val rpcOps = object : RPCOps { override val protocolVersion: Int get() = throw UnsupportedOperationException() @@ -71,6 +71,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() { LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = ::makeTestIdentityService) networkMapRegistrationFuture = doneFuture(Unit) + networkMapCache = PersistentNetworkMapCache(serviceHub = object : MockServiceHubInternal(database, config) {}) } @After diff --git a/node/src/test/kotlin/net/corda/node/services/network/InMemoryNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt similarity index 73% rename from node/src/test/kotlin/net/corda/node/services/network/InMemoryNetworkMapCacheTest.kt rename to node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt index 0b4cdb9a72..ff370836c0 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/InMemoryNetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt @@ -6,13 +6,14 @@ import net.corda.core.utilities.getOrThrow import net.corda.testing.ALICE import net.corda.testing.BOB import net.corda.testing.node.MockNetwork +import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before import org.junit.Test import java.math.BigInteger import kotlin.test.assertEquals -class InMemoryNetworkMapCacheTest { +class NetworkMapCacheTest { lateinit var mockNet: MockNetwork @Before @@ -47,9 +48,7 @@ class InMemoryNetworkMapCacheTest { // Node A currently knows only about itself, so this returns node A assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeA.info) - nodeA.database.transaction { - nodeA.services.networkMapCache.addNode(nodeB.info) - } + nodeA.services.networkMapCache.addNode(nodeB.info) // The details of node B write over those for node A assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info) } @@ -68,4 +67,20 @@ class InMemoryNetworkMapCacheTest { // TODO: Should have a test case with anonymous lookup } + + @Test + fun `remove node from cache`() { + val nodes = mockNet.createSomeNodes(1) + val n0 = nodes.mapNode + val n1 = nodes.partyNodes[0] + val node0Cache = n0.services.networkMapCache as PersistentNetworkMapCache + mockNet.runNetwork() + n0.database.transaction { + assertThat(node0Cache.getNodeByLegalIdentity(n1.info.legalIdentity) != null) + node0Cache.removeNode(n1.info) + assertThat(node0Cache.getNodeByLegalIdentity(n1.info.legalIdentity) == null) + assertThat(node0Cache.getNodeByLegalIdentity(n0.info.legalIdentity) != null) + assertThat(node0Cache.getNodeByLegalName(n1.info.legalIdentity.name) == null) + } + } } diff --git a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt new file mode 100644 index 0000000000..b5dae10615 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -0,0 +1,194 @@ +package net.corda.node.services.network + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.crypto.toBase58String +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.identity.Party +import net.corda.core.node.NodeInfo +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.core.utilities.unwrap +import net.corda.node.internal.Node +import net.corda.testing.ALICE +import net.corda.testing.BOB +import net.corda.testing.CHARLIE +import net.corda.testing.DUMMY_NOTARY +import net.corda.testing.node.NodeBasedTest +import org.assertj.core.api.Assertions.assertThat +import org.bouncycastle.asn1.x500.X500Name +import org.junit.Before +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertFails + +class PersistentNetworkMapCacheTest : NodeBasedTest() { + val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB) + val addressesMap: HashMap = HashMap() + val infos: MutableSet = HashSet() + + @Before + fun start() { + val nodes = startNodesWithPort(partiesList) + nodes.forEach { it.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting. + nodes.forEach { + infos.add(it.info) + addressesMap[it.info.legalIdentity.name] = it.info.addresses[0] + it.stop() // We want them to communicate with NetworkMapService to save data to cache. + } + } + + @Test + fun `get nodes by owning key and by name, no network map service`() { + val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0] + val netCache = alice.services.networkMapCache as PersistentNetworkMapCache + alice.database.transaction { + val res = netCache.getNodeByLegalIdentity(alice.info.legalIdentity) + assertEquals(alice.info, res) + val res2 = netCache.getNodeByLegalName(DUMMY_NOTARY.name) + assertEquals(infos.filter { it.legalIdentity.name == DUMMY_NOTARY.name }.singleOrNull(), res2) + } + } + + @Test + fun `get nodes by address no network map service`() { + val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0] + val netCache = alice.services.networkMapCache as PersistentNetworkMapCache + alice.database.transaction { + val res = netCache.getNodeByAddress(alice.info.addresses[0]) + assertEquals(alice.info, res) + } + } + + @Test + fun `restart node with DB map cache and no network map`() { + val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0] + val partyNodes = alice.services.networkMapCache.partyNodes + assert(NetworkMapService.type !in alice.info.advertisedServices.map { it.info.type }) + assertEquals(null, alice.inNodeNetworkMapService) + assertEquals(infos.size, partyNodes.size) + assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet()) + } + + @Test + fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() { + val parties = partiesList.subList(1, partiesList.size) + val nodes = startNodesWithPort(parties, noNetworkMap = true) + assert(nodes.all { it.inNodeNetworkMapService == null }) + assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } }) + nodes.forEach { + val partyNodes = it.services.networkMapCache.partyNodes + assertEquals(infos.size, partyNodes.size) + assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet()) + } + checkConnectivity(nodes) + } + + @Test + fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() { + val parties = partiesList.subList(1, partiesList.size) + val nodes = startNodesWithPort(parties, noNetworkMap = false) + assert(nodes.all { it.inNodeNetworkMapService == null }) + assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } }) + nodes.forEach { + val partyNodes = it.services.networkMapCache.partyNodes + assertEquals(infos.size, partyNodes.size) + assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet()) + } + checkConnectivity(nodes) + } + + @Test + fun `start node and network map communicate`() { + val parties = partiesList.subList(0, 2) + val nodes = startNodesWithPort(parties, noNetworkMap = false) + checkConnectivity(nodes) + } + + @Test + fun `start node without networkMapService and no database - fail`() { + assertFails { startNode(CHARLIE.name, noNetworkMap = true).getOrThrow(2.seconds) } + } + + @Test + fun `new node joins network without network map started`() { + val parties = partiesList.subList(1, partiesList.size) + // Start 2 nodes pointing at network map, but don't start network map service. + val otherNodes = startNodesWithPort(parties, noNetworkMap = false) + otherNodes.forEach { node -> + assert(infos.any { it.legalIdentity == node.info.legalIdentity }) + } + // Start node that is not in databases of other nodes. Point to NMS. Which has't started yet. + val charlie = startNodesWithPort(listOf(CHARLIE), noNetworkMap = false)[0] + otherNodes.forEach { + assert(charlie.info.legalIdentity !in it.services.networkMapCache.partyNodes.map { it.legalIdentity }) + } + // Start Network Map and see that charlie node appears in caches. + val nms = startNodesWithPort(listOf(DUMMY_NOTARY), noNetworkMap = false)[0] + nms.startupComplete.get() + assert(nms.inNodeNetworkMapService != null) + assert(infos.any {it.legalIdentity == nms.info.legalIdentity}) + otherNodes.forEach { + assert(nms.info.legalIdentity in it.services.networkMapCache.partyNodes.map { it.legalIdentity }) + } + charlie.nodeReadyFuture.get() // Finish registration. + checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS. + val cacheA = otherNodes[0].services.networkMapCache.partyNodes + val cacheB = otherNodes[1].services.networkMapCache.partyNodes + val cacheC = charlie.services.networkMapCache.partyNodes + assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap + assert(charlie.info.legalIdentity in cacheB.map { it.legalIdentity }) // Other nodes also fetched data from Network Map with node C. + assertEquals(cacheA.toSet(), cacheB.toSet()) + assertEquals(cacheA.toSet(), cacheC.toSet()) + } + + // HELPERS + // Helper function to restart nodes with the same host and port. + private fun startNodesWithPort(nodesToStart: List, noNetworkMap: Boolean = false): List { + return nodesToStart.map { party -> + val configOverrides = addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap() + if (party == DUMMY_NOTARY) { + startNetworkMapNode(party.name, configOverrides = configOverrides) + } + else { + startNode(party.name, + configOverrides = configOverrides, + noNetworkMap = noNetworkMap, + waitForConnection = false).getOrThrow() + } + } + } + + // Check that nodes are functional, communicate each with each. + private fun checkConnectivity(nodes: List) { + nodes.forEach { node1 -> + nodes.forEach { node2 -> + node2.registerInitiatedFlow(SendBackFlow::class.java) + val resultFuture = node1.services.startFlow(SendFlow(node2.info.legalIdentity)).resultFuture + assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!") + } + } + } + + @InitiatingFlow + private class SendFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): String { + println("SEND FLOW to $otherParty") + println("Party key ${otherParty.owningKey.toBase58String()}") + return sendAndReceive(otherParty, "Hi!").unwrap { it } + } + } + + @InitiatedBy(SendFlow::class) + private class SendBackFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() { + println("SEND BACK FLOW to $otherParty") + println("Party key ${otherParty.owningKey.toBase58String()}") + send(otherParty, "Hello!") + } + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 64d21ded83..a5084a5f42 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -220,6 +220,7 @@ class FlowFrameworkTests { node2.stop() node2.database.transaction { assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint + node2.services.networkMapCache.clearNetworkMapCache() } val node2b = mockNet.createNode(node1.network.myAddress, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray()) node2.manuallyCloseDB() diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt index 547d29be7b..d726b836d1 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt @@ -85,7 +85,7 @@ class VisualiserViewModel { // bottom left: -23.2031,29.8406 // top right: 33.0469,64.3209 try { - return node.place.coordinate.project(view.mapImage.fitWidth, view.mapImage.fitHeight, 64.3209, 29.8406, -23.2031, 33.0469) + return node.place.coordinate!!.project(view.mapImage.fitWidth, view.mapImage.fitHeight, 64.3209, 29.8406, -23.2031, 33.0469) } catch(e: Exception) { throw Exception("Cannot project ${node.info.legalIdentity}", e) } diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt index c70c383102..b8fcc78b5b 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt @@ -261,7 +261,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, } } - val networkInitialisationFinished = mockNet.nodes.map { it.networkMapRegistrationFuture }.transpose() + val networkInitialisationFinished = mockNet.nodes.map { it.nodeReadyFuture }.transpose() fun start(): CordaFuture { mockNet.startNodes() diff --git a/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index acec6efcb2..11f099dc5b 100644 --- a/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -78,5 +78,4 @@ class DriverTests { assertThat(debugLinesPresent).isTrue() } } - } diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/test-utils/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt similarity index 96% rename from node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt rename to test-utils/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt index 1994ed04c8..ff70a21606 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/test-utils/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt @@ -1,4 +1,4 @@ -package net.corda.node.services +package net.corda.node.testing import com.codahale.metrics.MetricRegistry import net.corda.core.flows.FlowInitiator @@ -63,7 +63,7 @@ open class MockServiceHubInternal( override val clock: Clock get() = overrideClock ?: throw UnsupportedOperationException() override val myInfo: NodeInfo - get() = NodeInfo(listOf(MOCK_HOST_AND_PORT), DUMMY_IDENTITY_1, NonEmptySet.of(DUMMY_IDENTITY_1), 1) // Required to get a dummy platformVersion when required for tests. + get() = NodeInfo(listOf(MOCK_HOST_AND_PORT), DUMMY_IDENTITY_1, NonEmptySet.of(DUMMY_IDENTITY_1), 1, serial = 1L) // Required to get a dummy platformVersion when required for tests. override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val rpcFlows: List>> get() = throw UnsupportedOperationException() diff --git a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt index de7c78d5cc..c22b5fb32d 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -223,7 +223,8 @@ sealed class PortAllocation { * } * * Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [CordaFuture] - * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service. + * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service or + * loaded node data from database. * * The driver implicitly bootstraps a [NetworkMapService]. * @@ -698,7 +699,8 @@ class DriverDSL( // node port numbers to be shifted, so all demos and docs need to be updated accordingly. "webAddress" to webAddress.toString(), "p2pAddress" to dedicatedNetworkMapAddress.toString(), - "useTestClock" to useTestClock + "useTestClock" to useTestClock, + "extraAdvertisedServiceIds" to listOf(ServiceInfo(NetworkMapService.type).toString()) ) ) return startNodeInternal(config, webAddress, startInProcess) @@ -716,7 +718,7 @@ class DriverDSL( ) return nodeAndThreadFuture.flatMap { (node, thread) -> establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, openFuture()).flatMap { rpc -> - rpc.waitUntilRegisteredWithNetworkMap().map { + rpc.waitUntilNetworkReady().map { NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread) } } @@ -731,9 +733,9 @@ class DriverDSL( } // We continue to use SSL enabled port for RPC when its for node user. establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, processDeathFuture).flatMap { rpc -> - // Call waitUntilRegisteredWithNetworkMap in background in case RPC is failing over: + // Call waitUntilNetworkReady in background in case RPC is failing over: val networkMapFuture = executorService.fork { - rpc.waitUntilRegisteredWithNetworkMap() + rpc.waitUntilNetworkReady() }.flatMap { it } firstOf(processDeathFuture, networkMapFuture) { if (it == processDeathFuture) { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt index ae80cc56da..75cdade8e0 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt @@ -4,11 +4,11 @@ import co.paralleluniverse.common.util.VisibleForTesting import net.corda.core.crypto.entropyToKeyPair import net.corda.core.identity.Party import net.corda.core.node.NodeInfo -import net.corda.core.node.ServiceHub import net.corda.core.node.services.NetworkMapCache import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NonEmptySet -import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.testing.getTestPartyAndCertificate import net.corda.testing.getTestX509Name import rx.Observable @@ -18,7 +18,7 @@ import java.math.BigInteger /** * Network map cache with no backing map service. */ -class MockNetworkMapCache(serviceHub: ServiceHub) : InMemoryNetworkMapCache(serviceHub) { +class MockNetworkMapCache(serviceHub: ServiceHubInternal) : PersistentNetworkMapCache(serviceHub) { private companion object { val BANK_C = getTestPartyAndCertificate(getTestX509Name("Bank C"), entropyToKeyPair(BigInteger.valueOf(1000)).public) val BANK_D = getTestPartyAndCertificate(getTestX509Name("Bank D"), entropyToKeyPair(BigInteger.valueOf(2000)).public) @@ -29,8 +29,8 @@ class MockNetworkMapCache(serviceHub: ServiceHub) : InMemoryNetworkMapCache(serv override val changed: Observable = PublishSubject.create() init { - val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), BANK_C, NonEmptySet.of(BANK_C), 1) - val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), BANK_D, NonEmptySet.of(BANK_D), 1) + val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), BANK_C, NonEmptySet.of(BANK_C), 1, serial = 1L) + val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), BANK_D, NonEmptySet.of(BANK_D), 1, serial = 1L) registeredNodes[mockNodeA.legalIdentity.owningKey] = mockNodeA registeredNodes[mockNodeB.legalIdentity.owningKey] = mockNodeB runWithoutMapService() diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index eae9105499..f014135766 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -212,7 +212,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun noNetworkMapConfigured() = doneFuture(Unit) // There is no need to slow down the unit tests by initialising CityDatabase - override fun findMyLocation(): WorldMapLocation? = null + open fun findMyLocation(): WorldMapLocation? = null // It's left only for NetworkVisualiserSimulation override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1) @@ -302,7 +302,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, return nodeFactory.create(config, this, networkMapAddress, advertisedServices.toSet(), id, overrideServices, entropyRoot).apply { if (start) { start() - if (threadPerNode && networkMapAddress != null) networkMapRegistrationFuture.getOrThrow() // XXX: What about manually-started nodes? + if (threadPerNode && networkMapAddress != null) nodeReadyFuture.getOrThrow() // XXX: What about manually-started nodes? } _nodes.add(this) } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index f68746fc1a..ab496c749a 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -160,7 +160,7 @@ open class MockServices(vararg val keys: KeyPair) : ServiceHub { override val clock: Clock get() = Clock.systemUTC() override val myInfo: NodeInfo get() { val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public) - return NodeInfo(emptyList(), identity, NonEmptySet.of(identity), 1) + return NodeInfo(emptyList(), identity, NonEmptySet.of(identity), 1, serial = 1L) } override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt index 8bd949ca60..6f5f55a365 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -4,6 +4,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.appendToCommonName import net.corda.core.crypto.commonName import net.corda.core.crypto.getX509Name +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.map @@ -19,6 +20,8 @@ import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.configOf import net.corda.node.services.config.plus +import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.nodeapi.User @@ -76,6 +79,13 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { portNotBoundChecks.transpose().getOrThrow() } + /** + * Clear network map data from nodes' databases. + */ + fun clearAllNodeInfoDb() { + nodes.forEach { it.services.networkMapCache.clearNetworkMapCache() } + } + /** * You can use this method to start the network map node in a more customised manner. Otherwise it * will automatically be started with the default parameters. @@ -85,30 +95,44 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), configOverrides: Map = emptyMap()): Node { - check(_networkMapNode == null) - return startNodeInternal(legalName, platformVersion, advertisedServices, rpcUsers, configOverrides).apply { + check(_networkMapNode == null || _networkMapNode!!.info.legalIdentity.name == legalName) + return startNodeInternal(legalName, platformVersion, advertisedServices + ServiceInfo(NetworkMapService.type), rpcUsers, configOverrides).apply { _networkMapNode = this } } + @JvmOverloads fun startNode(legalName: X500Name, platformVersion: Int = 1, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), - configOverrides: Map = emptyMap()): CordaFuture { + configOverrides: Map = emptyMap(), + noNetworkMap: Boolean = false, + waitForConnection: Boolean = true): CordaFuture { + val networkMapConf = if (noNetworkMap) { + // Nonexistent network map service address. + mapOf( + "networkMapService" to mapOf( + "address" to "localhost:10000", + "legalName" to networkMapNode.info.legalIdentity.name.toString() + ) + ) + } else { + mapOf( + "networkMapService" to mapOf( + "address" to networkMapNode.configuration.p2pAddress.toString(), + "legalName" to networkMapNode.info.legalIdentity.name.toString() + ) + ) + } val node = startNodeInternal( legalName, platformVersion, advertisedServices, rpcUsers, - mapOf( - "networkMapService" to mapOf( - "address" to networkMapNode.configuration.p2pAddress.toString(), - "legalName" to networkMapNode.info.legalIdentity.name.toString() - ) - ) + configOverrides - ) - return node.networkMapRegistrationFuture.map { node } + networkMapConf + configOverrides, + noNetworkMap) + return if (waitForConnection) node.nodeReadyFuture.map { node } else doneFuture(node) } fun startNotaryCluster(notaryName: X500Name, @@ -149,18 +173,21 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { platformVersion: Int, advertisedServices: Set, rpcUsers: List, - configOverrides: Map): Node { + configOverrides: Map, + noNetworkMap: Boolean = false): Node { val baseDirectory = baseDirectory(legalName).createDirectories() val localPort = getFreeLocalPorts("localhost", 2) + val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString() val config = ConfigHelper.loadConfig( baseDirectory = baseDirectory, allowMissingConfig = true, configOverrides = configOf( "myLegalName" to legalName.toString(), - "p2pAddress" to localPort[0].toString(), + "p2pAddress" to p2pAddress, "rpcAddress" to localPort[1].toString(), "extraAdvertisedServiceIds" to advertisedServices.map { it.toString() }, - "rpcUsers" to rpcUsers.map { it.toMap() } + "rpcUsers" to rpcUsers.map { it.toMap() }, + "noNetworkMap" to noNetworkMap ) + configOverrides ) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt index f10d1b12cf..a3de30fef1 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt @@ -15,8 +15,8 @@ import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.keys.E2ETestKeyManagementService import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient -import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.schema.NodeSchemaService +import net.corda.node.testing.MockServiceHubInternal import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.configureDatabase @@ -42,7 +42,8 @@ class SimpleNode(val config: NodeConfiguration, val address: NetworkHostAndPort val keyService: KeyManagementService = E2ETestKeyManagementService(identityService, setOf(identity)) val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1) // TODO: We should have a dummy service hub rather than change behaviour in tests - val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port, InMemoryNetworkMapCache(serviceHub = null), userService) + val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port, + MockNetworkMapCache(serviceHub = object : MockServiceHubInternal(database = database, configuration = config) {}), userService) val networkMapRegistrationFuture = openFuture() val network = database.transaction { NodeMessagingClient( diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt index cfec8b3757..1bf02e2a52 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt @@ -1,6 +1,8 @@ package net.corda.demobench.model import com.typesafe.config.Config +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.ServiceType import net.corda.core.utilities.parseNetworkHostAndPort import org.bouncycastle.asn1.x500.X500Name import tornadofx.* @@ -51,14 +53,14 @@ class InstallFactory : Controller() { return port } - private fun Config.parseExtraServices(path: String): List { - val services = serviceController.services.toSortedSet() + private fun Config.parseExtraServices(path: String): MutableList { + val services = serviceController.services.toSortedSet() + ServiceInfo(ServiceType.networkMap).toString() return this.getStringList(path) .filter { !it.isNullOrEmpty() } .map { svc -> require(svc in services, { "Unknown service '$svc'." }) svc - }.toList() + }.toMutableList() } } diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt index 64406da824..fc13438570 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt @@ -16,7 +16,7 @@ class NodeConfig( val rpcPort: Int, val webPort: Int, val h2Port: Int, - val extraServices: List, + val extraServices: MutableList = mutableListOf(), val users: List = listOf(defaultUser), var networkMap: NetworkMapConfig? = null ) : NetworkMapConfig(legalName, p2pPort), HasPlugins { diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt index 0b484aacda..18750dd4ae 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt @@ -1,6 +1,8 @@ package net.corda.demobench.model import net.corda.core.crypto.getX509Name +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.ServiceType import net.corda.demobench.plugin.PluginController import net.corda.demobench.pty.R3Pty import tornadofx.* @@ -54,15 +56,15 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { baseDir, getX509Name( myLegalName = nodeData.legalName.value.trim(), - email = "corda@city.${location.countryCode.toLowerCase()}.example", - nearestCity = location.description, + email = "corda@city.${location.countryCode!!.toLowerCase()}.example", + nearestCity = location.description!!, country = location.countryCode ), nodeData.p2pPort.value, nodeData.rpcPort.value, nodeData.webPort.value, nodeData.h2Port.value, - nodeData.extraServices.value + nodeData.extraServices.toMutableList() ) if (nodes.putIfAbsent(config.key, config) != null) { @@ -98,6 +100,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { if (hasNetworkMap()) { config.networkMap = networkMapConfig } else { + config.extraServices.add(ServiceInfo(ServiceType.networkMap).toString()) networkMapConfig = config log.info("Network map provided by: ${config.legalName}") } diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt index cf9822f969..71566e98a4 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt @@ -220,7 +220,7 @@ class NodeTabView : Fragment() { imageview { image = flags.get()[it.countryCode] } - label(it.description) + label(it.description!!) alignment = Pos.CENTER_LEFT } } diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt index 6fee64384a..e73cbef9c7 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt @@ -82,7 +82,7 @@ class NodeConfigTest { @Test fun `test services`() { - val config = createConfig(services = listOf("my.service")) + val config = createConfig(services = mutableListOf("my.service")) assertEquals(listOf("my.service"), config.extraServices) } @@ -107,13 +107,13 @@ class NodeConfigTest { @Test fun `test cash issuer`() { - val config = createConfig(services = listOf("corda.issuer.GBP")) + val config = createConfig(services = mutableListOf("corda.issuer.GBP")) assertTrue(config.isCashIssuer) } @Test fun `test not cash issuer`() { - val config = createConfig(services = listOf("corda.issuerubbish")) + val config = createConfig(services = mutableListOf("corda.issuerubbish")) assertFalse(config.isCashIssuer) } @@ -138,7 +138,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) assertEquals(prettyPrint("{" @@ -164,7 +164,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) @@ -193,7 +193,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) @@ -223,7 +223,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) @@ -257,7 +257,7 @@ class NodeConfigTest { rpcPort: Int = -1, webPort: Int = -1, h2Port: Int = -1, - services: List = listOf("extra.service"), + services: MutableList = mutableListOf("extra.service"), users: List = listOf(user("guest")) ) = NodeConfig( baseDir, diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt index d476ccc93a..40cee11f4a 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt @@ -170,7 +170,7 @@ class NodeControllerTest { rpcPort: Int = -1, webPort: Int = -1, h2Port: Int = -1, - services: List = listOf("extra.service"), + services: MutableList = mutableListOf("extra.service"), users: List = listOf(user("guest")) ) = NodeConfig( baseDir, diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt index 23dcb222cf..2c33129de9 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt @@ -27,10 +27,13 @@ import javafx.util.Duration import net.corda.client.jfx.model.* import net.corda.client.jfx.utils.* import net.corda.core.contracts.ContractState +import net.corda.core.crypto.locationOrNull import net.corda.core.crypto.toBase58String import net.corda.core.identity.Party +import net.corda.core.node.CityDatabase import net.corda.core.node.NodeInfo import net.corda.core.node.ScreenCoordinate +import net.corda.core.node.WorldMapLocation import net.corda.explorer.formatters.PartyNameFormatter import net.corda.explorer.model.CordaView import tornadofx.* @@ -99,7 +102,7 @@ class Network : CordaView() { copyableLabel(SimpleObjectProperty(node.legalIdentity.owningKey.toBase58String())).apply { minWidth = 400.0 } } row("Services :") { label(node.advertisedServices.map { it.info }.joinToString(", ")) } - node.worldMapLocation?.apply { row("Location :") { label(this@apply.description) } } + node.getWorldMapLocation()?.apply { row("Location :") { label(this@apply.description!!) } } } } setOnMouseClicked { @@ -123,7 +126,7 @@ class Network : CordaView() { contentDisplay = ContentDisplay.TOP val coordinate = Bindings.createObjectBinding({ // These coordinates are obtained when we generate the map using TileMill. - node.worldMapLocation?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0) + node.getWorldMapLocation()?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0) }, arrayOf(mapPane.widthProperty(), mapPane.heightProperty())) // Center point of the label. layoutXProperty().bind(coordinate.map { it.screenX - width / 2 }) diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt index 254b8aacb2..7e70be42b0 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt @@ -185,7 +185,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List