From 472ecc65c633bb40ebe5650b69af2e7a3b90bc6a Mon Sep 17 00:00:00 2001 From: Katarzyna Streich Date: Thu, 31 Aug 2017 11:00:11 +0100 Subject: [PATCH] NetworkMapCache database backed (#1135) Work on database backed NetworkMapCache Make NodeInfo JPA entity. Enable node startup with it's database network map cache. Fix schema. Make node not wait for finishing network map service registration if it successfully loaded data from database. Add tests for startup without NetworkMapService. * Rename networkMapRegistrationFuture Change networkMapRegistrationFuture to nodeReadyFuture, it no longer indicates the NetworkMapService registration, because we are able to run network without map service configured. * Partially integrate database into NetworkMapCache Full integrtion will come with service removal. Move MockServiceHubInternal to net.corda.node.testing * Add workaround to transaction scope race Temporary workaround to force isolated transaction (otherwise it causes race conditions when processing network map registration on network map node). * Remove WorldMapLocation from NodeInfo Infer the node's location based on X500 name Add serial number on NodeInfo For tests of running without NetworkMap, start nodes with nonexistent NetworkMap address Make clearNetworkMapCache callable via RPC. --- .../net/corda/core/messaging/CordaRPCOps.kt | 7 +- .../kotlin/net/corda/core/node/NodeInfo.kt | 20 +- .../core/node/services/NetworkMapCache.kt | 13 +- .../net/corda/core/schemas/NodeInfoSchema.kt | 142 ++++++++ docs/source/api-persistence.rst | 6 +- .../corda/docs/IntegrationTestingTutorial.kt | 4 +- .../services/messaging/P2PMessagingTest.kt | 2 +- .../services/messaging/P2PSecurityTest.kt | 2 +- .../net/corda/node/internal/AbstractNode.kt | 51 ++- .../corda/node/internal/CordaRPCOpsImpl.kt | 6 +- .../kotlin/net/corda/node/internal/Node.kt | 12 +- .../net/corda/node/internal/NodeStartup.kt | 2 +- .../node/services/api/ServiceHubInternal.kt | 3 + .../messaging/ArtemisMessagingServer.kt | 1 + .../network/InMemoryNetworkMapCache.kt | 185 ---------- .../network/PersistentNetworkMapCache.kt | 339 ++++++++++++++++++ .../node/services/schema/NodeSchemaService.kt | 2 + .../statemachine/StateMachineManager.kt | 3 +- .../events/NodeSchedulerServiceTest.kt | 2 +- .../messaging/ArtemisMessagingTests.kt | 7 +- ...MapCacheTest.kt => NetworkMapCacheTest.kt} | 23 +- .../network/PersistentNetworkMapCacheTest.kt | 194 ++++++++++ .../statemachine/FlowFrameworkTests.kt | 1 + .../net/corda/netmap/VisualiserViewModel.kt | 2 +- .../net/corda/netmap/simulation/Simulation.kt | 2 +- .../net/corda/testing/driver/DriverTests.kt | 1 - .../node/testing}/MockServiceHubInternal.kt | 4 +- .../kotlin/net/corda/testing/driver/Driver.kt | 12 +- .../corda/testing/node/MockNetworkMapCache.kt | 10 +- .../kotlin/net/corda/testing/node/MockNode.kt | 4 +- .../net/corda/testing/node/MockServices.kt | 2 +- .../net/corda/testing/node/NodeBasedTest.kt | 55 ++- .../net/corda/testing/node/SimpleNode.kt | 5 +- .../corda/demobench/model/InstallFactory.kt | 8 +- .../net/corda/demobench/model/NodeConfig.kt | 2 +- .../corda/demobench/model/NodeController.kt | 9 +- .../net/corda/demobench/views/NodeTabView.kt | 2 +- .../corda/demobench/model/NodeConfigTest.kt | 16 +- .../demobench/model/NodeControllerTest.kt | 2 +- .../net/corda/explorer/views/Network.kt | 7 +- .../kotlin/net/corda/loadtest/LoadTest.kt | 2 +- 41 files changed, 885 insertions(+), 287 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/schemas/NodeInfoSchema.kt delete mode 100644 node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt rename node/src/test/kotlin/net/corda/node/services/network/{InMemoryNetworkMapCacheTest.kt => NetworkMapCacheTest.kt} (73%) create mode 100644 node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt rename {node/src/test/kotlin/net/corda/node/services => test-utils/src/main/kotlin/net/corda/node/testing}/MockServiceHubInternal.kt (96%) diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 3133e1c3b0..e7734161f4 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -257,7 +257,7 @@ interface CordaRPCOps : RPCOps { * complete with an exception if it is unable to. */ @RPCReturnsObservables - fun waitUntilRegisteredWithNetworkMap(): CordaFuture + fun waitUntilNetworkReady(): CordaFuture // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of // the node's state locally and query that directly. @@ -299,6 +299,11 @@ interface CordaRPCOps : RPCOps { * @return the node info if available. */ fun nodeIdentityFromParty(party: AbstractParty): NodeInfo? + + /** + * Clear all network map data from local node cache. + */ + fun clearNetworkMapCache() } inline fun CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), diff --git a/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt b/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt index d7f30a4f50..c802831d0f 100644 --- a/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt +++ b/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt @@ -1,12 +1,16 @@ package net.corda.core.node +import net.corda.core.crypto.locationOrNull import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.NodeInfoSchemaV1 import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NonEmptySet +import net.corda.core.serialization.serialize /** * Information for an advertised service including the service specific identity information. @@ -21,11 +25,13 @@ data class ServiceEntry(val info: ServiceInfo, val identity: PartyAndCertificate // TODO We currently don't support multi-IP/multi-identity nodes, we only left slots in the data structures. @CordaSerializable data class NodeInfo(val addresses: List, - val legalIdentityAndCert: PartyAndCertificate, //TODO This field will be removed in future PR which gets rid of services. - val legalIdentitiesAndCerts: NonEmptySet, + // TODO After removing of services these two fields will be merged together and made NonEmptySet. + val legalIdentityAndCert: PartyAndCertificate, + val legalIdentitiesAndCerts: Set, val platformVersion: Int, val advertisedServices: List = emptyList(), - val worldMapLocation: WorldMapLocation? = null) { + val serial: Long +) { init { require(advertisedServices.none { it.identity == legalIdentityAndCert }) { "Service identities must be different from node legal identity" @@ -37,4 +43,12 @@ data class NodeInfo(val addresses: List, fun serviceIdentities(type: ServiceType): List { return advertisedServices.mapNotNull { if (it.info.type.isSubTypeOf(type)) it.identity.party else null } } + + /** + * Uses node's owner X500 name to infer the node's location. Used in Explorer in map view. + */ + fun getWorldMapLocation(): WorldMapLocation? { + val nodeOwnerLocation = legalIdentity.name.locationOrNull + return nodeOwnerLocation?.let { CityDatabase[it] } + } } diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 33fbd1f10e..3f854a44fc 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -8,6 +8,7 @@ import net.corda.core.internal.randomOrNull import net.corda.core.messaging.DataFeed import net.corda.core.node.NodeInfo import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.NetworkHostAndPort import org.bouncycastle.asn1.x500.X500Name import rx.Observable import java.security.PublicKey @@ -44,7 +45,7 @@ interface NetworkMapCache { /** Tracks changes to the network map cache */ val changed: Observable /** Future to track completion of the NetworkMapService registration. */ - val mapServiceRegistered: CordaFuture + val nodeReady: CordaFuture /** * Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the @@ -76,7 +77,10 @@ interface NetworkMapCache { fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? /** Look up the node info for a legal name. */ - fun getNodeByLegalName(principal: X500Name): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == principal } + fun getNodeByLegalName(principal: X500Name): NodeInfo? + + /** Look up the node info for a host and port. */ + fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? /** * In general, nodes can advertise multiple identities: a legal identity, and separate identities for each of @@ -144,4 +148,9 @@ interface NetworkMapCache { "Your options are: ${notaryNodes.map { "\"${it.notaryIdentity.name}\"" }.joinToString()}.") return notary.advertisedServices.any { it.info.type.isValidatingNotary() } } + + /** + * Clear all network map data from local node cache. + */ + fun clearNetworkMapCache() } diff --git a/core/src/main/kotlin/net/corda/core/schemas/NodeInfoSchema.kt b/core/src/main/kotlin/net/corda/core/schemas/NodeInfoSchema.kt new file mode 100644 index 0000000000..f0d21aeaf3 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/schemas/NodeInfoSchema.kt @@ -0,0 +1,142 @@ +package net.corda.core.schemas + +import net.corda.core.crypto.toBase58String +import net.corda.core.identity.PartyAndCertificate +import net.corda.core.node.NodeInfo +import net.corda.core.node.ServiceEntry +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort +import java.io.Serializable +import java.security.cert.CertPath +import javax.persistence.CascadeType +import javax.persistence.Column +import javax.persistence.ElementCollection +import javax.persistence.Embeddable +import javax.persistence.EmbeddedId +import javax.persistence.Entity +import javax.persistence.GeneratedValue +import javax.persistence.Id +import javax.persistence.JoinColumn +import javax.persistence.JoinTable +import javax.persistence.Lob +import javax.persistence.ManyToMany +import javax.persistence.OneToMany +import javax.persistence.Table + +object NodeInfoSchema + +object NodeInfoSchemaV1 : MappedSchema( + schemaFamily = NodeInfoSchema.javaClass, + version = 1, + mappedTypes = listOf(PersistentNodeInfo::class.java, DBPartyAndCertificate::class.java, DBHostAndPort::class.java) +) { + @Entity + @Table(name = "node_infos") + class PersistentNodeInfo( + @Id + @GeneratedValue + @Column(name = "node_info_id") + var id: Int, + + @Column(name = "addresses") + @OneToMany(cascade = arrayOf(CascadeType.ALL), orphanRemoval = true) + val addresses: List, + + @Column(name = "legal_identities_certs") + @ManyToMany(cascade = arrayOf(CascadeType.ALL)) + @JoinTable(name = "link_nodeinfo_party", + joinColumns = arrayOf(JoinColumn(name="node_info_id")), + inverseJoinColumns = arrayOf(JoinColumn(name="party_name"))) + val legalIdentitiesAndCerts: Set, + + @Column(name = "platform_version") + val platformVersion: Int, + + @Column(name = "advertised_services") + @ElementCollection + var advertisedServices: List = emptyList(), + + /** + * serial is an increasing value which represents the version of [NodeInfo]. + * Not expected to be sequential, but later versions of the registration must have higher values + * Similar to the serial number on DNS records. + */ + @Column(name = "serial") + val serial: Long + ) { + fun toNodeInfo(): NodeInfo { + return NodeInfo( + this.addresses.map { it.toHostAndPort() }, + this.legalIdentitiesAndCerts.filter { it.isMain }.single().toLegalIdentityAndCert(), // TODO Workaround, it will be changed after PR with services removal. + this.legalIdentitiesAndCerts.filter { !it.isMain }.map { it.toLegalIdentityAndCert() }.toSet(), + this.platformVersion, + this.advertisedServices.map { + it.serviceEntry?.deserialize() ?: throw IllegalStateException("Service entry shouldn't be null") + }, + this.serial + ) + } + } + + @Embeddable + data class PKHostAndPort( + val host: String? = null, + val port: Int? = null + ) : Serializable + + @Entity + data class DBHostAndPort( + @EmbeddedId + private val pk: PKHostAndPort + ) { + companion object { + fun fromHostAndPort(hostAndPort: NetworkHostAndPort) = DBHostAndPort( + PKHostAndPort(hostAndPort.host, hostAndPort.port) + ) + } + fun toHostAndPort(): NetworkHostAndPort { + return NetworkHostAndPort(this.pk.host!!, this.pk.port!!) + } + } + + @Embeddable // TODO To be removed with services. + data class DBServiceEntry( + @Column(length = 65535) + val serviceEntry: ByteArray? = null + ) + + /** + * PartyAndCertificate entity (to be replaced by referencing final Identity Schema). + */ + @Entity + @Table(name = "node_info_party_cert") + data class DBPartyAndCertificate( + @Id + @Column(name = "owning_key", length = 65535, nullable = false) + val owningKey: String, + + //@Id // TODO Do we assume that names are unique? Note: We can't have it as Id, because our toString on X500 is inconsistent. + @Column(name = "party_name", nullable = false) + val name: String, + + @Column(name = "certificate") + @Lob + val certificate: ByteArray, + + @Column(name = "certificate_path") + @Lob + val certPath: ByteArray, + + val isMain: Boolean, + + @ManyToMany(mappedBy = "legalIdentitiesAndCerts", cascade = arrayOf(CascadeType.ALL)) // ManyToMany because of distributed services. + private val persistentNodeInfos: Set = emptySet() + ) { + constructor(partyAndCert: PartyAndCertificate, isMain: Boolean = false) + : this(partyAndCert.party.owningKey.toBase58String(), partyAndCert.party.name.toString(), partyAndCert.certificate.serialize().bytes, partyAndCert.certPath.serialize().bytes, isMain) + fun toLegalIdentityAndCert(): PartyAndCertificate { + return PartyAndCertificate(certPath.deserialize()) + } + } +} diff --git a/docs/source/api-persistence.rst b/docs/source/api-persistence.rst index 8844f80b4a..fbfb0c5d98 100644 --- a/docs/source/api-persistence.rst +++ b/docs/source/api-persistence.rst @@ -17,9 +17,9 @@ The ORM mapping is specified using the `Java Persistence API (dummyTopic, sessionId).getOrThrow(5.seconds) - assertThat(requestsReceived.get()).isGreaterThanOrEqualTo(2) assertThat(response).isEqualTo(responseMessage) } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index 0e32802348..788d76ae00 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -67,7 +67,7 @@ class P2PSecurityTest : NodeBasedTest() { private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): CordaFuture { val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public) - val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1) + val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1, serial = 1) val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX) val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress) return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 146f391689..c21f7bd800 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -12,6 +12,7 @@ import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.* +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps @@ -39,7 +40,7 @@ import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.sendRequest -import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService.RegistrationRequest import net.corda.node.services.network.NetworkMapService.RegistrationResponse @@ -138,10 +139,11 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, var isPreviousCheckpointsPresent = false private set - protected val _networkMapRegistrationFuture = openFuture() - /** Completes once the node has successfully registered with the network map service */ - val networkMapRegistrationFuture: CordaFuture - get() = _networkMapRegistrationFuture + protected val _nodeReadyFuture = openFuture() + /** Completes once the node has successfully registered with the network map service + * or has loaded network map data from local database */ + val nodeReadyFuture: CordaFuture + get() = _nodeReadyFuture /** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */ open val pluginRegistries: List by lazy { @@ -155,10 +157,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, /** The implementation of the [CordaRPCOps] interface used by this node. */ open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM. - open fun findMyLocation(): WorldMapLocation? { - return configuration.myLegalName.locationOrNull?.let { CityDatabase[it] } - } - open fun start() { require(!started) { "Node has already been started" } @@ -208,7 +206,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } runOnStop += network::stop - _networkMapRegistrationFuture.captureLater(registerWithNetworkMapIfConfigured()) + } + // If we successfully loaded network data from database, we set this future to Unit. + _nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured()) + database.transaction { smm.start() // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } @@ -483,9 +484,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, private fun makeInfo(legalIdentity: PartyAndCertificate): NodeInfo { val advertisedServiceEntries = makeServiceEntries() - val allIdentities = (advertisedServiceEntries.map { it.identity } + legalIdentity).toNonEmptySet() + val allIdentities = advertisedServiceEntries.map { it.identity }.toSet() // TODO Add node's legalIdentity (after services removal). val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet. - return NodeInfo(addresses, legalIdentity, allIdentities, platformVersion, advertisedServiceEntries, findMyLocation()) + return NodeInfo(addresses, legalIdentity, allIdentities, platformVersion, advertisedServiceEntries, platformClock.instant().toEpochMilli()) } /** @@ -580,7 +581,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, services.networkMapCache.runWithoutMapService() noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache } else { - registerWithNetworkMap() + val netMapRegistration = registerWithNetworkMap() + // We may want to start node immediately with database data and not wait for network map registration (but send it either way). + // So we are ready to go. + if (services.networkMapCache.loadDBSuccess) { + log.info("Node successfully loaded network map data from the database.") + doneFuture(Unit) + } else { + netMapRegistration + } } } @@ -606,7 +615,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, // Register this node against the network val instant = platformClock.instant() val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD - val reg = NodeRegistration(info, instant.toEpochMilli(), ADD, expires) + val reg = NodeRegistration(info, info.serial, ADD, expires) val request = RegistrationRequest(reg.toWire(services.keyManagementService, info.legalIdentityAndCert.owningKey), network.myAddress) return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress) } @@ -616,9 +625,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, /** This is overriden by the mock node implementation to enable operation without any network map service */ protected open fun noNetworkMapConfigured(): CordaFuture { - // TODO: There should be a consistent approach to configuration error exceptions. - throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " + - "has any other map node been configured.") + if (services.networkMapCache.loadDBSuccess) { + return doneFuture(Unit) + } else { + // TODO: There should be a consistent approach to configuration error exceptions. + throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " + + "has any other map node been configured.") + } } protected open fun makeKeyManagementService(identityService: IdentityService): KeyManagementService { @@ -754,7 +767,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override val monitoringService = MonitoringService(MetricRegistry()) override val validatedTransactions = makeTransactionStorage() override val transactionVerifierService by lazy { makeTransactionVerifierService() } - override val networkMapCache by lazy { InMemoryNetworkMapCache(this) } + override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) } + override val networkMapCache by lazy { PersistentNetworkMapCache(this) } override val vaultService by lazy { NodeVaultService(this) } override val vaultQueryService by lazy { HibernateVaultQueryImpl(database.hibernateConfig, vaultService) @@ -776,7 +790,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override val networkService: MessagingService get() = network override val clock: Clock get() = platformClock override val myInfo: NodeInfo get() = info - override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) } override val database: CordaPersistence get() = this@AbstractNode.database override val configuration: NodeConfiguration get() = this@AbstractNode.configuration diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 79ca9d2f22..e87359f0a6 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -173,7 +173,7 @@ class CordaRPCOpsImpl( override fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class>) = services.vaultService.authoriseContractUpgrade(state, upgradedContractClass) override fun deauthoriseContractUpgrade(state: StateAndRef<*>) = services.vaultService.deauthoriseContractUpgrade(state) override fun currentNodeTime(): Instant = Instant.now(services.clock) - override fun waitUntilRegisteredWithNetworkMap() = services.networkMapCache.mapServiceRegistered + override fun waitUntilNetworkReady() = services.networkMapCache.nodeReady override fun partyFromAnonymous(party: AbstractParty): Party? = services.identityService.partyFromAnonymous(party) override fun partyFromKey(key: PublicKey) = services.identityService.partyFromKey(key) override fun partyFromX500Name(x500Name: X500Name) = services.identityService.partyFromX500Name(x500Name) @@ -182,6 +182,10 @@ class CordaRPCOpsImpl( override fun registeredFlows(): List = services.rpcFlows.map { it.name }.sorted() + override fun clearNetworkMapCache() { + services.networkMapCache.clearNetworkMapCache() + } + companion object { private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.flowInitiator, flowLogic.track()) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index b30478e237..27016f1617 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -155,7 +155,7 @@ open class Node(override val configuration: FullNodeConfiguration, myIdentityOrNullIfNetworkMapService, serverThread, database, - networkMapRegistrationFuture, + nodeReadyFuture, services.monitoringService, advertisedAddress) } @@ -210,15 +210,17 @@ open class Node(override val configuration: FullNodeConfiguration, log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" } val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { - initialConnectAttempts = 5 - retryInterval = 5.seconds.toMillis() + initialConnectAttempts = 2 // TODO Public host discovery needs rewriting, as we may start nodes without network map, and we don't want to wait that long on startup. + retryInterval = 2.seconds.toMillis() retryIntervalMultiplier = 1.5 maxRetryInterval = 3.minutes.toMillis() } val clientFactory = try { locator.createSessionFactory() } catch (e: ActiveMQNotConnectedException) { - throw IOException("Unable to connect to the Network Map Service at $serverAddress for IP address discovery", e) + log.warn("Unable to connect to the Network Map Service at $serverAddress for IP address discovery. " + + "Using the provided \"${configuration.p2pAddress.host}\" as the advertised address.") + return null } val session = clientFactory.createSession(PEER_USER, PEER_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) @@ -306,7 +308,7 @@ open class Node(override val configuration: FullNodeConfiguration, } super.start() - networkMapRegistrationFuture.thenMatch({ + nodeReadyFuture.thenMatch({ serverThread.execute { // Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia: // diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 5641cb8953..2d55019454 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -102,7 +102,7 @@ open class NodeStartup(val args: Array) { node.start() printPluginsAndServices(node) - node.networkMapRegistrationFuture.thenMatch({ + node.nodeReadyFuture.thenMatch({ val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0 // TODO: Replace this with a standard function to get an unambiguous rendering of the X.500 name. val name = node.info.legalIdentity.name.orgName ?: node.info.legalIdentity.name.commonName diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 69e010892b..4d2bbfccbf 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -53,6 +53,9 @@ interface NetworkMapCacheInternal : NetworkMapCache { /** For testing where the network map cache is manipulated marks the service as immediately ready. */ @VisibleForTesting fun runWithoutMapService() + + /** Indicates if loading network map data from database was successful. */ + val loadDBSuccess: Boolean } @CordaSerializable diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 829c36bfbf..49ff18c942 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -338,6 +338,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. */ private fun updateBridgesOnNetworkChange(change: MapChange) { + log.debug { "Updating bridges on network map change: ${change.node}" } fun gatherAddresses(node: NodeInfo): Sequence { val peerAddress = getArtemisPeerAddress(node) val addresses = mutableListOf(peerAddress) diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt deleted file mode 100644 index 774e93f499..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ /dev/null @@ -1,185 +0,0 @@ -package net.corda.node.services.network - -import net.corda.core.concurrent.CordaFuture -import net.corda.core.identity.AbstractParty -import net.corda.core.identity.Party -import net.corda.core.internal.VisibleForTesting -import net.corda.core.internal.bufferUntilSubscribed -import net.corda.core.internal.concurrent.map -import net.corda.core.internal.concurrent.openFuture -import net.corda.core.messaging.DataFeed -import net.corda.core.messaging.SingleMessageRecipient -import net.corda.core.node.NodeInfo -import net.corda.core.node.ServiceHub -import net.corda.core.node.services.NetworkMapCache.MapChange -import net.corda.core.node.services.PartyInfo -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize -import net.corda.core.utilities.loggerFor -import net.corda.node.services.api.NetworkCacheError -import net.corda.node.services.api.NetworkMapCacheInternal -import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.messaging.createMessage -import net.corda.node.services.messaging.sendRequest -import net.corda.node.services.network.NetworkMapService.FetchMapResponse -import net.corda.node.services.network.NetworkMapService.SubscribeResponse -import net.corda.node.utilities.AddOrRemove -import net.corda.node.utilities.bufferUntilDatabaseCommit -import net.corda.node.utilities.wrapWithDatabaseTransaction -import rx.Observable -import rx.subjects.PublishSubject -import java.security.PublicKey -import java.security.SignatureException -import java.util.* -import javax.annotation.concurrent.ThreadSafe - -/** - * Extremely simple in-memory cache of the network map. - * - * @param serviceHub an optional service hub from which we'll take the identity service. We take a service hub rather - * than the identity service directly, as this avoids problems with service start sequence (network map cache - * and identity services depend on each other). Should always be provided except for unit test cases. - */ -@ThreadSafe -open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : SingletonSerializeAsToken(), NetworkMapCacheInternal { - companion object { - val logger = loggerFor() - } - - override val partyNodes: List get() = registeredNodes.map { it.value } - override val networkMapNodes: List get() = getNodesWithService(NetworkMapService.type) - private val _changed = PublishSubject.create() - // We use assignment here so that multiple subscribers share the same wrapped Observable. - override val changed: Observable = _changed.wrapWithDatabaseTransaction() - private val changePublisher: rx.Observer get() = _changed.bufferUntilDatabaseCommit() - - private val _registrationFuture = openFuture() - override val mapServiceRegistered: CordaFuture get() = _registrationFuture - - private var registeredForPush = false - protected var registeredNodes: MutableMap = Collections.synchronizedMap(HashMap()) - - override fun getPartyInfo(party: Party): PartyInfo? { - val node = registeredNodes[party.owningKey] - if (node != null) { - return PartyInfo.Node(node) - } - for ((_, value) in registeredNodes) { - for (service in value.advertisedServices) { - if (service.identity.party == party) { - return PartyInfo.Service(service) - } - } - } - return null - } - - override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = registeredNodes[identityKey] - override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? { - val wellKnownParty = if (serviceHub != null) { - serviceHub.identityService.partyFromAnonymous(party) - } else { - party - } - - return wellKnownParty?.let { - getNodeByLegalIdentityKey(it.owningKey) - } - } - - override fun track(): DataFeed, MapChange> { - synchronized(_changed) { - return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) - } - } - - override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean, - ifChangedSinceVer: Int?): CordaFuture { - if (subscribe && !registeredForPush) { - // Add handler to the network, for updates received from the remote network map service. - network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ -> - try { - val req = message.data.deserialize() - val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC, - data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes) - network.send(ackMessage, req.replyTo) - processUpdatePush(req) - } catch(e: NodeMapError) { - logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") - } catch(e: Exception) { - logger.error("Exception processing update from network map service", e) - } - } - registeredForPush = true - } - - // Fetch the network map and register for updates at the same time - val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress) - val future = network.sendRequest(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) -> - // We may not receive any nodes back, if the map hasn't changed since the version specified - nodes?.forEach { processRegistration(it) } - Unit - } - _registrationFuture.captureLater(future.map { null }) - - return future - } - - override fun addNode(node: NodeInfo) { - synchronized(_changed) { - val previousNode = registeredNodes.put(node.legalIdentity.owningKey, node) - if (previousNode == null) { - changePublisher.onNext(MapChange.Added(node)) - } else if (previousNode != node) { - changePublisher.onNext(MapChange.Modified(node, previousNode)) - } - } - } - - override fun removeNode(node: NodeInfo) { - synchronized(_changed) { - registeredNodes.remove(node.legalIdentity.owningKey) - changePublisher.onNext(MapChange.Removed(node)) - } - } - - /** - * Unsubscribes from updates from the given map service. - * @param service the network map service to listen to updates from. - */ - override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture { - // Fetch the network map and register for updates at the same time - val req = NetworkMapService.SubscribeRequest(false, network.myAddress) - // `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo. - val address = network.getAddressOfParty(PartyInfo.Node(service)) - val future = network.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map { - if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed() - } - _registrationFuture.captureLater(future.map { null }) - return future - } - - fun processUpdatePush(req: NetworkMapService.Update) { - try { - val reg = req.wireReg.verified() - processRegistration(reg) - } catch (e: SignatureException) { - throw NodeMapError.InvalidSignature() - } - } - - private fun processRegistration(reg: NodeRegistration) { - // TODO: Implement filtering by sequence number, so we only accept changes that are - // more recent than the latest change we've processed. - when (reg.type) { - AddOrRemove.ADD -> addNode(reg.node) - AddOrRemove.REMOVE -> removeNode(reg.node) - } - } - - @VisibleForTesting - override fun runWithoutMapService() { - _registrationFuture.set(null) - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt new file mode 100644 index 0000000000..74a437817b --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -0,0 +1,339 @@ +package net.corda.node.services.network + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.crypto.parsePublicKeyBase58 +import net.corda.core.crypto.toBase58String +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.Party +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.concurrent.map +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.messaging.DataFeed +import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.NetworkMapCache.MapChange +import net.corda.core.node.services.PartyInfo +import net.corda.core.schemas.NodeInfoSchemaV1 +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.loggerFor +import net.corda.node.services.api.NetworkCacheError +import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.messaging.createMessage +import net.corda.node.services.messaging.sendRequest +import net.corda.node.services.network.NetworkMapService.FetchMapResponse +import net.corda.node.services.network.NetworkMapService.SubscribeResponse +import net.corda.node.utilities.AddOrRemove +import net.corda.node.utilities.DatabaseTransactionManager +import net.corda.node.utilities.bufferUntilDatabaseCommit +import net.corda.node.utilities.wrapWithDatabaseTransaction +import org.bouncycastle.asn1.x500.X500Name +import org.hibernate.Session +import rx.Observable +import rx.subjects.PublishSubject +import java.security.PublicKey +import java.security.SignatureException +import java.util.* +import javax.annotation.concurrent.ThreadSafe + +/** + * Extremely simple in-memory cache of the network map. + * + * @param serviceHub an optional service hub from which we'll take the identity service. We take a service hub rather + * than the identity service directly, as this avoids problems with service start sequence (network map cache + * and identity services depend on each other). Should always be provided except for unit test cases. + */ +@ThreadSafe +open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) : SingletonSerializeAsToken(), NetworkMapCacheInternal { + companion object { + val logger = loggerFor() + } + + private var registeredForPush = false + // TODO Small explanation, partyNodes and registeredNodes is left in memory as it was before, because it will be removed in + // next PR that gets rid of services. These maps are used only for queries by service. + override val partyNodes: List get() = registeredNodes.map { it.value } + override val networkMapNodes: List get() = getNodesWithService(NetworkMapService.type) + private val _changed = PublishSubject.create() + // We use assignment here so that multiple subscribers share the same wrapped Observable. + override val changed: Observable = _changed.wrapWithDatabaseTransaction() + private val changePublisher: rx.Observer get() = _changed.bufferUntilDatabaseCommit() + + private val _registrationFuture = openFuture() + override val nodeReady: CordaFuture get() = _registrationFuture + protected val registeredNodes: MutableMap = Collections.synchronizedMap(HashMap()) + private var _loadDBSuccess: Boolean = false + override val loadDBSuccess get() = _loadDBSuccess + + init { + serviceHub.database.transaction { loadFromDB() } + } + + override fun getPartyInfo(party: Party): PartyInfo? { + val nodes = serviceHub.database.transaction { queryByIdentityKey(party.owningKey) } + if (nodes.size == 1 && nodes[0].legalIdentity == party) { + return PartyInfo.Node(nodes[0]) + } + for (node in nodes) { + for (service in node.advertisedServices) { + if (service.identity.party == party) { + return PartyInfo.Service(service) + } + } + } + return null + } + + // TODO See comment to queryByLegalName why it's left like that. + override fun getNodeByLegalName(principal: X500Name): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == principal } + //serviceHub!!.database.transaction { queryByLegalName(principal).firstOrNull() } + override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = + serviceHub.database.transaction { queryByIdentityKey(identityKey).firstOrNull() } + override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? { + val wellKnownParty = serviceHub.identityService.partyFromAnonymous(party) + return wellKnownParty?.let { + getNodeByLegalIdentityKey(it.owningKey) + } + } + + override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = serviceHub.database.transaction { queryByAddress(address) } + + override fun track(): DataFeed, MapChange> { + synchronized(_changed) { + return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + } + } + + override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean, + ifChangedSinceVer: Int?): CordaFuture { + if (subscribe && !registeredForPush) { + // Add handler to the network, for updates received from the remote network map service. + network.addMessageHandler(NetworkMapService.PUSH_TOPIC) { message, _ -> + try { + val req = message.data.deserialize() + val ackMessage = network.createMessage(NetworkMapService.PUSH_ACK_TOPIC, + data = NetworkMapService.UpdateAcknowledge(req.mapVersion, network.myAddress).serialize().bytes) + network.send(ackMessage, req.replyTo) + processUpdatePush(req) + } catch(e: NodeMapError) { + logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") + } catch(e: Exception) { + logger.error("Exception processing update from network map service", e) + } + } + registeredForPush = true + } + + // Fetch the network map and register for updates at the same time + val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, network.myAddress) + val future = network.sendRequest(NetworkMapService.FETCH_TOPIC, req, networkMapAddress).map { (nodes) -> + // We may not receive any nodes back, if the map hasn't changed since the version specified + nodes?.forEach { processRegistration(it) } + Unit + } + _registrationFuture.captureLater(future.map { null }) + + return future + } + + override fun addNode(node: NodeInfo) { + synchronized(_changed) { + val previousNode = registeredNodes.put(node.legalIdentity.owningKey, node) + if (previousNode == null) { + serviceHub.database.transaction { + updateInfoDB(node) + changePublisher.onNext(MapChange.Added(node)) + } + } else if (previousNode != node) { + serviceHub.database.transaction { + updateInfoDB(node) + changePublisher.onNext(MapChange.Modified(node, previousNode)) + } + } + } + } + + override fun removeNode(node: NodeInfo) { + synchronized(_changed) { + registeredNodes.remove(node.legalIdentity.owningKey) + serviceHub.database.transaction { + removeInfoDB(node) + changePublisher.onNext(MapChange.Removed(node)) + } + } + } + + /** + * Unsubscribes from updates from the given map service. + * @param service the network map service to listen to updates from. + */ + override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture { + // Fetch the network map and register for updates at the same time + val req = NetworkMapService.SubscribeRequest(false, network.myAddress) + // `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo. + val address = network.getAddressOfParty(PartyInfo.Node(service)) + val future = network.sendRequest(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map { + if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed() + } + _registrationFuture.captureLater(future.map { null }) + return future + } + + fun processUpdatePush(req: NetworkMapService.Update) { + try { + val reg = req.wireReg.verified() + processRegistration(reg) + } catch (e: SignatureException) { + throw NodeMapError.InvalidSignature() + } + } + + private fun processRegistration(reg: NodeRegistration) { + // TODO: Implement filtering by sequence number, so we only accept changes that are + // more recent than the latest change we've processed. + when (reg.type) { + AddOrRemove.ADD -> addNode(reg.node) + AddOrRemove.REMOVE -> removeNode(reg.node) + } + } + + @VisibleForTesting + override fun runWithoutMapService() { + _registrationFuture.set(null) + } + + // Changes related to NetworkMap redesign + // TODO It will be properly merged into network map cache after services removal. + + private inline fun createSession(block: (Session) -> T): T { + return DatabaseTransactionManager.current().session.let { block(it) } + } + + private fun getAllInfos(session: Session): List { + val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java) + criteria.select(criteria.from(NodeInfoSchemaV1.PersistentNodeInfo::class.java)) + return session.createQuery(criteria).resultList + } + + /** + * Load NetworkMap data from the database if present. Node can start without having NetworkMapService configured. + */ + private fun loadFromDB() { + logger.info("Loading network map from database...") + createSession { + val result = getAllInfos(it) + for (nodeInfo in result) { + try { + logger.info("Loaded node info: $nodeInfo") + val publicKey = parsePublicKeyBase58(nodeInfo.legalIdentitiesAndCerts.single { it.isMain }.owningKey) + val node = nodeInfo.toNodeInfo() + registeredNodes.put(publicKey, node) + changePublisher.onNext(MapChange.Added(node)) // Redeploy bridges after reading from DB on startup. + _loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready. + } catch (e: Exception) { + logger.warn("Exception parsing network map from the database.", e) + } + } + if (loadDBSuccess) { + _registrationFuture.set(null) // Useful only if we don't have NetworkMapService configured so StateMachineManager can start. + } + } + } + + private fun updateInfoDB(nodeInfo: NodeInfo) { + // TODO Temporary workaround to force isolated transaction (otherwise it causes race conditions when processing + // network map registration on network map node) + val session = serviceHub.database.entityManagerFactory.withOptions().connection(serviceHub.database.dataSource.connection + .apply { + transactionIsolation = 1 + }).openSession() + + val tx = session.beginTransaction() + // TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo? + val info = findByIdentityKey(session, nodeInfo.legalIdentity.owningKey) + val nodeInfoEntry = generateMappedObject(nodeInfo) + if (info.isNotEmpty()) { + nodeInfoEntry.id = info[0].id + } + session.merge(nodeInfoEntry) + tx.commit() + session.close() + } + + private fun removeInfoDB(nodeInfo: NodeInfo) { + createSession { + val info = findByIdentityKey(it, nodeInfo.legalIdentity.owningKey).single() + it.remove(info) + } + } + + private fun findByIdentityKey(session: Session, identityKey: PublicKey): List { + val query = session.createQuery( + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKey = :owningKey", + NodeInfoSchemaV1.PersistentNodeInfo::class.java) + query.setParameter("owningKey", identityKey.toBase58String()) + return query.resultList + } + + private fun queryByIdentityKey(identityKey: PublicKey): List { + createSession { + val result = findByIdentityKey(it, identityKey) + return result.map { it.toNodeInfo() } + } + } + + // TODO It's useless for now, because toString on X500 names is inconsistent and we have: + // C=ES,L=Madrid,O=Alice Corp,CN=Alice Corp + // CN=Alice Corp,O=Alice Corp,L=Madrid,C=ES + private fun queryByLegalName(name: X500Name): List { + createSession { + val query = it.createQuery( + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.name = :name", + NodeInfoSchemaV1.PersistentNodeInfo::class.java) + query.setParameter("name", name.toString()) + val result = query.resultList + return result.map { it.toNodeInfo() } + } + } + + private fun queryByAddress(hostAndPort: NetworkHostAndPort): NodeInfo? { + createSession { + val query = it.createQuery( + "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.addresses a WHERE a.pk.host = :host AND a.pk.port = :port", + NodeInfoSchemaV1.PersistentNodeInfo::class.java) + query.setParameter("host", hostAndPort.host) + query.setParameter("port", hostAndPort.port) + val result = query.resultList + return if (result.isEmpty()) null + else result.map { it.toNodeInfo() }.singleOrNull() ?: throw IllegalStateException("More than one node with the same host and port") + } + } + + /** Object Relational Mapping support. */ + private fun generateMappedObject(nodeInfo: NodeInfo): NodeInfoSchemaV1.PersistentNodeInfo { + return NodeInfoSchemaV1.PersistentNodeInfo( + id = 0, + addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) }, + legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.map { NodeInfoSchemaV1.DBPartyAndCertificate(it) }.toSet() + // TODO It's workaround to keep the main identity, will be removed in future PR getting rid of services. + + NodeInfoSchemaV1.DBPartyAndCertificate(nodeInfo.legalIdentityAndCert, isMain = true), + platformVersion = nodeInfo.platformVersion, + advertisedServices = nodeInfo.advertisedServices.map { NodeInfoSchemaV1.DBServiceEntry(it.serialize().bytes) }, + serial = nodeInfo.serial + ) + } + + override fun clearNetworkMapCache() { + serviceHub.database.transaction { + createSession { + val result = getAllInfos(it) + for (nodeInfo in result) it.remove(nodeInfo) + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 331ef4461d..0b53876af9 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -5,6 +5,7 @@ import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.LinearState import net.corda.core.schemas.CommonSchemaV1 import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.NodeInfoSchemaV1 import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState import net.corda.core.serialization.SingletonSerializeAsToken @@ -58,6 +59,7 @@ class NodeSchemaService(customSchemas: Set = emptySet()) : SchemaS val requiredSchemas: Map = mapOf(Pair(CommonSchemaV1, SchemaService.SchemaOptions()), Pair(VaultSchemaV1, SchemaService.SchemaOptions()), + Pair(NodeInfoSchemaV1, SchemaService.SchemaOptions()), Pair(NodeServicesV1, SchemaService.SchemaOptions())) override val schemaOptions: Map = requiredSchemas.plus(customSchemas.map { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 4c2e3b0368..a20c6aaca3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -39,6 +39,7 @@ import net.corda.node.utilities.wrapWithDatabaseTransaction import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl import net.corda.nodeapi.internal.serialization.withTokenContext import org.apache.activemq.artemis.utils.ReusableLatch +import org.bouncycastle.asn1.x500.X500Name import org.slf4j.Logger import rx.Observable import rx.subjects.PublishSubject @@ -170,7 +171,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, checkQuasarJavaAgentPresence() restoreFibersFromCheckpoints() listenToLedgerTransactions() - serviceHub.networkMapCache.mapServiceRegistered.then { executor.execute(this::resumeRestoredFibers) } + serviceHub.networkMapCache.nodeReady.then { executor.execute(this::resumeRestoredFibers) } } private fun checkQuasarJavaAgentPresence() { diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index 0280afd686..c8bdfb9287 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -11,12 +11,12 @@ import net.corda.core.node.services.VaultService import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.days -import net.corda.node.services.MockServiceHubInternal import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.vault.NodeVaultService +import net.corda.node.testing.MockServiceHubInternal import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.configureDatabase diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index c095eb86fb..3d1942c01c 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -12,9 +12,10 @@ import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate -import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.node.testing.MockServiceHubInternal import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.configureDatabase @@ -54,8 +55,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() { var messagingClient: NodeMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null - // TODO: We should have a dummy service hub rather than change behaviour in tests - val networkMapCache = InMemoryNetworkMapCache(serviceHub = null) + lateinit var networkMapCache: PersistentNetworkMapCache val rpcOps = object : RPCOps { override val protocolVersion: Int get() = throw UnsupportedOperationException() @@ -71,6 +71,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() { LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), createIdentityService = ::makeTestIdentityService) networkMapRegistrationFuture = doneFuture(Unit) + networkMapCache = PersistentNetworkMapCache(serviceHub = object : MockServiceHubInternal(database, config) {}) } @After diff --git a/node/src/test/kotlin/net/corda/node/services/network/InMemoryNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt similarity index 73% rename from node/src/test/kotlin/net/corda/node/services/network/InMemoryNetworkMapCacheTest.kt rename to node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt index 0b4cdb9a72..ff370836c0 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/InMemoryNetworkMapCacheTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapCacheTest.kt @@ -6,13 +6,14 @@ import net.corda.core.utilities.getOrThrow import net.corda.testing.ALICE import net.corda.testing.BOB import net.corda.testing.node.MockNetwork +import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before import org.junit.Test import java.math.BigInteger import kotlin.test.assertEquals -class InMemoryNetworkMapCacheTest { +class NetworkMapCacheTest { lateinit var mockNet: MockNetwork @Before @@ -47,9 +48,7 @@ class InMemoryNetworkMapCacheTest { // Node A currently knows only about itself, so this returns node A assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeA.info) - nodeA.database.transaction { - nodeA.services.networkMapCache.addNode(nodeB.info) - } + nodeA.services.networkMapCache.addNode(nodeB.info) // The details of node B write over those for node A assertEquals(nodeA.services.networkMapCache.getNodeByLegalIdentityKey(nodeA.info.legalIdentity.owningKey), nodeB.info) } @@ -68,4 +67,20 @@ class InMemoryNetworkMapCacheTest { // TODO: Should have a test case with anonymous lookup } + + @Test + fun `remove node from cache`() { + val nodes = mockNet.createSomeNodes(1) + val n0 = nodes.mapNode + val n1 = nodes.partyNodes[0] + val node0Cache = n0.services.networkMapCache as PersistentNetworkMapCache + mockNet.runNetwork() + n0.database.transaction { + assertThat(node0Cache.getNodeByLegalIdentity(n1.info.legalIdentity) != null) + node0Cache.removeNode(n1.info) + assertThat(node0Cache.getNodeByLegalIdentity(n1.info.legalIdentity) == null) + assertThat(node0Cache.getNodeByLegalIdentity(n0.info.legalIdentity) != null) + assertThat(node0Cache.getNodeByLegalName(n1.info.legalIdentity.name) == null) + } + } } diff --git a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt new file mode 100644 index 0000000000..b5dae10615 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -0,0 +1,194 @@ +package net.corda.node.services.network + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.crypto.toBase58String +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.identity.Party +import net.corda.core.node.NodeInfo +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.core.utilities.unwrap +import net.corda.node.internal.Node +import net.corda.testing.ALICE +import net.corda.testing.BOB +import net.corda.testing.CHARLIE +import net.corda.testing.DUMMY_NOTARY +import net.corda.testing.node.NodeBasedTest +import org.assertj.core.api.Assertions.assertThat +import org.bouncycastle.asn1.x500.X500Name +import org.junit.Before +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.assertFails + +class PersistentNetworkMapCacheTest : NodeBasedTest() { + val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB) + val addressesMap: HashMap = HashMap() + val infos: MutableSet = HashSet() + + @Before + fun start() { + val nodes = startNodesWithPort(partiesList) + nodes.forEach { it.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting. + nodes.forEach { + infos.add(it.info) + addressesMap[it.info.legalIdentity.name] = it.info.addresses[0] + it.stop() // We want them to communicate with NetworkMapService to save data to cache. + } + } + + @Test + fun `get nodes by owning key and by name, no network map service`() { + val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0] + val netCache = alice.services.networkMapCache as PersistentNetworkMapCache + alice.database.transaction { + val res = netCache.getNodeByLegalIdentity(alice.info.legalIdentity) + assertEquals(alice.info, res) + val res2 = netCache.getNodeByLegalName(DUMMY_NOTARY.name) + assertEquals(infos.filter { it.legalIdentity.name == DUMMY_NOTARY.name }.singleOrNull(), res2) + } + } + + @Test + fun `get nodes by address no network map service`() { + val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0] + val netCache = alice.services.networkMapCache as PersistentNetworkMapCache + alice.database.transaction { + val res = netCache.getNodeByAddress(alice.info.addresses[0]) + assertEquals(alice.info, res) + } + } + + @Test + fun `restart node with DB map cache and no network map`() { + val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0] + val partyNodes = alice.services.networkMapCache.partyNodes + assert(NetworkMapService.type !in alice.info.advertisedServices.map { it.info.type }) + assertEquals(null, alice.inNodeNetworkMapService) + assertEquals(infos.size, partyNodes.size) + assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet()) + } + + @Test + fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() { + val parties = partiesList.subList(1, partiesList.size) + val nodes = startNodesWithPort(parties, noNetworkMap = true) + assert(nodes.all { it.inNodeNetworkMapService == null }) + assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } }) + nodes.forEach { + val partyNodes = it.services.networkMapCache.partyNodes + assertEquals(infos.size, partyNodes.size) + assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet()) + } + checkConnectivity(nodes) + } + + @Test + fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() { + val parties = partiesList.subList(1, partiesList.size) + val nodes = startNodesWithPort(parties, noNetworkMap = false) + assert(nodes.all { it.inNodeNetworkMapService == null }) + assert(nodes.all { NetworkMapService.type !in it.info.advertisedServices.map { it.info.type } }) + nodes.forEach { + val partyNodes = it.services.networkMapCache.partyNodes + assertEquals(infos.size, partyNodes.size) + assertEquals(infos.map { it.legalIdentity }.toSet(), partyNodes.map { it.legalIdentity }.toSet()) + } + checkConnectivity(nodes) + } + + @Test + fun `start node and network map communicate`() { + val parties = partiesList.subList(0, 2) + val nodes = startNodesWithPort(parties, noNetworkMap = false) + checkConnectivity(nodes) + } + + @Test + fun `start node without networkMapService and no database - fail`() { + assertFails { startNode(CHARLIE.name, noNetworkMap = true).getOrThrow(2.seconds) } + } + + @Test + fun `new node joins network without network map started`() { + val parties = partiesList.subList(1, partiesList.size) + // Start 2 nodes pointing at network map, but don't start network map service. + val otherNodes = startNodesWithPort(parties, noNetworkMap = false) + otherNodes.forEach { node -> + assert(infos.any { it.legalIdentity == node.info.legalIdentity }) + } + // Start node that is not in databases of other nodes. Point to NMS. Which has't started yet. + val charlie = startNodesWithPort(listOf(CHARLIE), noNetworkMap = false)[0] + otherNodes.forEach { + assert(charlie.info.legalIdentity !in it.services.networkMapCache.partyNodes.map { it.legalIdentity }) + } + // Start Network Map and see that charlie node appears in caches. + val nms = startNodesWithPort(listOf(DUMMY_NOTARY), noNetworkMap = false)[0] + nms.startupComplete.get() + assert(nms.inNodeNetworkMapService != null) + assert(infos.any {it.legalIdentity == nms.info.legalIdentity}) + otherNodes.forEach { + assert(nms.info.legalIdentity in it.services.networkMapCache.partyNodes.map { it.legalIdentity }) + } + charlie.nodeReadyFuture.get() // Finish registration. + checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS. + val cacheA = otherNodes[0].services.networkMapCache.partyNodes + val cacheB = otherNodes[1].services.networkMapCache.partyNodes + val cacheC = charlie.services.networkMapCache.partyNodes + assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap + assert(charlie.info.legalIdentity in cacheB.map { it.legalIdentity }) // Other nodes also fetched data from Network Map with node C. + assertEquals(cacheA.toSet(), cacheB.toSet()) + assertEquals(cacheA.toSet(), cacheC.toSet()) + } + + // HELPERS + // Helper function to restart nodes with the same host and port. + private fun startNodesWithPort(nodesToStart: List, noNetworkMap: Boolean = false): List { + return nodesToStart.map { party -> + val configOverrides = addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap() + if (party == DUMMY_NOTARY) { + startNetworkMapNode(party.name, configOverrides = configOverrides) + } + else { + startNode(party.name, + configOverrides = configOverrides, + noNetworkMap = noNetworkMap, + waitForConnection = false).getOrThrow() + } + } + } + + // Check that nodes are functional, communicate each with each. + private fun checkConnectivity(nodes: List) { + nodes.forEach { node1 -> + nodes.forEach { node2 -> + node2.registerInitiatedFlow(SendBackFlow::class.java) + val resultFuture = node1.services.startFlow(SendFlow(node2.info.legalIdentity)).resultFuture + assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!") + } + } + } + + @InitiatingFlow + private class SendFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): String { + println("SEND FLOW to $otherParty") + println("Party key ${otherParty.owningKey.toBase58String()}") + return sendAndReceive(otherParty, "Hi!").unwrap { it } + } + } + + @InitiatedBy(SendFlow::class) + private class SendBackFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() { + println("SEND BACK FLOW to $otherParty") + println("Party key ${otherParty.owningKey.toBase58String()}") + send(otherParty, "Hello!") + } + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 64d21ded83..a5084a5f42 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -220,6 +220,7 @@ class FlowFrameworkTests { node2.stop() node2.database.transaction { assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint + node2.services.networkMapCache.clearNetworkMapCache() } val node2b = mockNet.createNode(node1.network.myAddress, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray()) node2.manuallyCloseDB() diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt index 547d29be7b..d726b836d1 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/VisualiserViewModel.kt @@ -85,7 +85,7 @@ class VisualiserViewModel { // bottom left: -23.2031,29.8406 // top right: 33.0469,64.3209 try { - return node.place.coordinate.project(view.mapImage.fitWidth, view.mapImage.fitHeight, 64.3209, 29.8406, -23.2031, 33.0469) + return node.place.coordinate!!.project(view.mapImage.fitWidth, view.mapImage.fitHeight, 64.3209, 29.8406, -23.2031, 33.0469) } catch(e: Exception) { throw Exception("Cannot project ${node.info.legalIdentity}", e) } diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt index c70c383102..b8fcc78b5b 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/simulation/Simulation.kt @@ -261,7 +261,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, } } - val networkInitialisationFinished = mockNet.nodes.map { it.networkMapRegistrationFuture }.transpose() + val networkInitialisationFinished = mockNet.nodes.map { it.nodeReadyFuture }.transpose() fun start(): CordaFuture { mockNet.startNodes() diff --git a/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index acec6efcb2..11f099dc5b 100644 --- a/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -78,5 +78,4 @@ class DriverTests { assertThat(debugLinesPresent).isTrue() } } - } diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/test-utils/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt similarity index 96% rename from node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt rename to test-utils/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt index 1994ed04c8..ff70a21606 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/test-utils/src/main/kotlin/net/corda/node/testing/MockServiceHubInternal.kt @@ -1,4 +1,4 @@ -package net.corda.node.services +package net.corda.node.testing import com.codahale.metrics.MetricRegistry import net.corda.core.flows.FlowInitiator @@ -63,7 +63,7 @@ open class MockServiceHubInternal( override val clock: Clock get() = overrideClock ?: throw UnsupportedOperationException() override val myInfo: NodeInfo - get() = NodeInfo(listOf(MOCK_HOST_AND_PORT), DUMMY_IDENTITY_1, NonEmptySet.of(DUMMY_IDENTITY_1), 1) // Required to get a dummy platformVersion when required for tests. + get() = NodeInfo(listOf(MOCK_HOST_AND_PORT), DUMMY_IDENTITY_1, NonEmptySet.of(DUMMY_IDENTITY_1), 1, serial = 1L) // Required to get a dummy platformVersion when required for tests. override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val rpcFlows: List>> get() = throw UnsupportedOperationException() diff --git a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt index de7c78d5cc..c22b5fb32d 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -223,7 +223,8 @@ sealed class PortAllocation { * } * * Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [CordaFuture] - * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service. + * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service or + * loaded node data from database. * * The driver implicitly bootstraps a [NetworkMapService]. * @@ -698,7 +699,8 @@ class DriverDSL( // node port numbers to be shifted, so all demos and docs need to be updated accordingly. "webAddress" to webAddress.toString(), "p2pAddress" to dedicatedNetworkMapAddress.toString(), - "useTestClock" to useTestClock + "useTestClock" to useTestClock, + "extraAdvertisedServiceIds" to listOf(ServiceInfo(NetworkMapService.type).toString()) ) ) return startNodeInternal(config, webAddress, startInProcess) @@ -716,7 +718,7 @@ class DriverDSL( ) return nodeAndThreadFuture.flatMap { (node, thread) -> establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, openFuture()).flatMap { rpc -> - rpc.waitUntilRegisteredWithNetworkMap().map { + rpc.waitUntilNetworkReady().map { NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread) } } @@ -731,9 +733,9 @@ class DriverDSL( } // We continue to use SSL enabled port for RPC when its for node user. establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, processDeathFuture).flatMap { rpc -> - // Call waitUntilRegisteredWithNetworkMap in background in case RPC is failing over: + // Call waitUntilNetworkReady in background in case RPC is failing over: val networkMapFuture = executorService.fork { - rpc.waitUntilRegisteredWithNetworkMap() + rpc.waitUntilNetworkReady() }.flatMap { it } firstOf(processDeathFuture, networkMapFuture) { if (it == processDeathFuture) { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt index ae80cc56da..75cdade8e0 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt @@ -4,11 +4,11 @@ import co.paralleluniverse.common.util.VisibleForTesting import net.corda.core.crypto.entropyToKeyPair import net.corda.core.identity.Party import net.corda.core.node.NodeInfo -import net.corda.core.node.ServiceHub import net.corda.core.node.services.NetworkMapCache import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NonEmptySet -import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.testing.getTestPartyAndCertificate import net.corda.testing.getTestX509Name import rx.Observable @@ -18,7 +18,7 @@ import java.math.BigInteger /** * Network map cache with no backing map service. */ -class MockNetworkMapCache(serviceHub: ServiceHub) : InMemoryNetworkMapCache(serviceHub) { +class MockNetworkMapCache(serviceHub: ServiceHubInternal) : PersistentNetworkMapCache(serviceHub) { private companion object { val BANK_C = getTestPartyAndCertificate(getTestX509Name("Bank C"), entropyToKeyPair(BigInteger.valueOf(1000)).public) val BANK_D = getTestPartyAndCertificate(getTestX509Name("Bank D"), entropyToKeyPair(BigInteger.valueOf(2000)).public) @@ -29,8 +29,8 @@ class MockNetworkMapCache(serviceHub: ServiceHub) : InMemoryNetworkMapCache(serv override val changed: Observable = PublishSubject.create() init { - val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), BANK_C, NonEmptySet.of(BANK_C), 1) - val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), BANK_D, NonEmptySet.of(BANK_D), 1) + val mockNodeA = NodeInfo(listOf(BANK_C_ADDR), BANK_C, NonEmptySet.of(BANK_C), 1, serial = 1L) + val mockNodeB = NodeInfo(listOf(BANK_D_ADDR), BANK_D, NonEmptySet.of(BANK_D), 1, serial = 1L) registeredNodes[mockNodeA.legalIdentity.owningKey] = mockNodeA registeredNodes[mockNodeB.legalIdentity.owningKey] = mockNodeB runWithoutMapService() diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index eae9105499..f014135766 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -212,7 +212,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun noNetworkMapConfigured() = doneFuture(Unit) // There is no need to slow down the unit tests by initialising CityDatabase - override fun findMyLocation(): WorldMapLocation? = null + open fun findMyLocation(): WorldMapLocation? = null // It's left only for NetworkVisualiserSimulation override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1) @@ -302,7 +302,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, return nodeFactory.create(config, this, networkMapAddress, advertisedServices.toSet(), id, overrideServices, entropyRoot).apply { if (start) { start() - if (threadPerNode && networkMapAddress != null) networkMapRegistrationFuture.getOrThrow() // XXX: What about manually-started nodes? + if (threadPerNode && networkMapAddress != null) nodeReadyFuture.getOrThrow() // XXX: What about manually-started nodes? } _nodes.add(this) } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index f68746fc1a..ab496c749a 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -160,7 +160,7 @@ open class MockServices(vararg val keys: KeyPair) : ServiceHub { override val clock: Clock get() = Clock.systemUTC() override val myInfo: NodeInfo get() { val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public) - return NodeInfo(emptyList(), identity, NonEmptySet.of(identity), 1) + return NodeInfo(emptyList(), identity, NonEmptySet.of(identity), 1, serial = 1L) } override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt index 8bd949ca60..6f5f55a365 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -4,6 +4,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.appendToCommonName import net.corda.core.crypto.commonName import net.corda.core.crypto.getX509Name +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.map @@ -19,6 +20,8 @@ import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.configOf import net.corda.node.services.config.plus +import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.nodeapi.User @@ -76,6 +79,13 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { portNotBoundChecks.transpose().getOrThrow() } + /** + * Clear network map data from nodes' databases. + */ + fun clearAllNodeInfoDb() { + nodes.forEach { it.services.networkMapCache.clearNetworkMapCache() } + } + /** * You can use this method to start the network map node in a more customised manner. Otherwise it * will automatically be started with the default parameters. @@ -85,30 +95,44 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), configOverrides: Map = emptyMap()): Node { - check(_networkMapNode == null) - return startNodeInternal(legalName, platformVersion, advertisedServices, rpcUsers, configOverrides).apply { + check(_networkMapNode == null || _networkMapNode!!.info.legalIdentity.name == legalName) + return startNodeInternal(legalName, platformVersion, advertisedServices + ServiceInfo(NetworkMapService.type), rpcUsers, configOverrides).apply { _networkMapNode = this } } + @JvmOverloads fun startNode(legalName: X500Name, platformVersion: Int = 1, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), - configOverrides: Map = emptyMap()): CordaFuture { + configOverrides: Map = emptyMap(), + noNetworkMap: Boolean = false, + waitForConnection: Boolean = true): CordaFuture { + val networkMapConf = if (noNetworkMap) { + // Nonexistent network map service address. + mapOf( + "networkMapService" to mapOf( + "address" to "localhost:10000", + "legalName" to networkMapNode.info.legalIdentity.name.toString() + ) + ) + } else { + mapOf( + "networkMapService" to mapOf( + "address" to networkMapNode.configuration.p2pAddress.toString(), + "legalName" to networkMapNode.info.legalIdentity.name.toString() + ) + ) + } val node = startNodeInternal( legalName, platformVersion, advertisedServices, rpcUsers, - mapOf( - "networkMapService" to mapOf( - "address" to networkMapNode.configuration.p2pAddress.toString(), - "legalName" to networkMapNode.info.legalIdentity.name.toString() - ) - ) + configOverrides - ) - return node.networkMapRegistrationFuture.map { node } + networkMapConf + configOverrides, + noNetworkMap) + return if (waitForConnection) node.nodeReadyFuture.map { node } else doneFuture(node) } fun startNotaryCluster(notaryName: X500Name, @@ -149,18 +173,21 @@ abstract class NodeBasedTest : TestDependencyInjectionBase() { platformVersion: Int, advertisedServices: Set, rpcUsers: List, - configOverrides: Map): Node { + configOverrides: Map, + noNetworkMap: Boolean = false): Node { val baseDirectory = baseDirectory(legalName).createDirectories() val localPort = getFreeLocalPorts("localhost", 2) + val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString() val config = ConfigHelper.loadConfig( baseDirectory = baseDirectory, allowMissingConfig = true, configOverrides = configOf( "myLegalName" to legalName.toString(), - "p2pAddress" to localPort[0].toString(), + "p2pAddress" to p2pAddress, "rpcAddress" to localPort[1].toString(), "extraAdvertisedServiceIds" to advertisedServices.map { it.toString() }, - "rpcUsers" to rpcUsers.map { it.toMap() } + "rpcUsers" to rpcUsers.map { it.toMap() }, + "noNetworkMap" to noNetworkMap ) + configOverrides ) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt index f10d1b12cf..a3de30fef1 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt @@ -15,8 +15,8 @@ import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.keys.E2ETestKeyManagementService import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient -import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.schema.NodeSchemaService +import net.corda.node.testing.MockServiceHubInternal import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.configureDatabase @@ -42,7 +42,8 @@ class SimpleNode(val config: NodeConfiguration, val address: NetworkHostAndPort val keyService: KeyManagementService = E2ETestKeyManagementService(identityService, setOf(identity)) val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1) // TODO: We should have a dummy service hub rather than change behaviour in tests - val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port, InMemoryNetworkMapCache(serviceHub = null), userService) + val broker = ArtemisMessagingServer(config, address.port, rpcAddress.port, + MockNetworkMapCache(serviceHub = object : MockServiceHubInternal(database = database, configuration = config) {}), userService) val networkMapRegistrationFuture = openFuture() val network = database.transaction { NodeMessagingClient( diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt index cfec8b3757..1bf02e2a52 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt @@ -1,6 +1,8 @@ package net.corda.demobench.model import com.typesafe.config.Config +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.ServiceType import net.corda.core.utilities.parseNetworkHostAndPort import org.bouncycastle.asn1.x500.X500Name import tornadofx.* @@ -51,14 +53,14 @@ class InstallFactory : Controller() { return port } - private fun Config.parseExtraServices(path: String): List { - val services = serviceController.services.toSortedSet() + private fun Config.parseExtraServices(path: String): MutableList { + val services = serviceController.services.toSortedSet() + ServiceInfo(ServiceType.networkMap).toString() return this.getStringList(path) .filter { !it.isNullOrEmpty() } .map { svc -> require(svc in services, { "Unknown service '$svc'." }) svc - }.toList() + }.toMutableList() } } diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt index 64406da824..fc13438570 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt @@ -16,7 +16,7 @@ class NodeConfig( val rpcPort: Int, val webPort: Int, val h2Port: Int, - val extraServices: List, + val extraServices: MutableList = mutableListOf(), val users: List = listOf(defaultUser), var networkMap: NetworkMapConfig? = null ) : NetworkMapConfig(legalName, p2pPort), HasPlugins { diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt index 0b484aacda..18750dd4ae 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt @@ -1,6 +1,8 @@ package net.corda.demobench.model import net.corda.core.crypto.getX509Name +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.ServiceType import net.corda.demobench.plugin.PluginController import net.corda.demobench.pty.R3Pty import tornadofx.* @@ -54,15 +56,15 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { baseDir, getX509Name( myLegalName = nodeData.legalName.value.trim(), - email = "corda@city.${location.countryCode.toLowerCase()}.example", - nearestCity = location.description, + email = "corda@city.${location.countryCode!!.toLowerCase()}.example", + nearestCity = location.description!!, country = location.countryCode ), nodeData.p2pPort.value, nodeData.rpcPort.value, nodeData.webPort.value, nodeData.h2Port.value, - nodeData.extraServices.value + nodeData.extraServices.toMutableList() ) if (nodes.putIfAbsent(config.key, config) != null) { @@ -98,6 +100,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { if (hasNetworkMap()) { config.networkMap = networkMapConfig } else { + config.extraServices.add(ServiceInfo(ServiceType.networkMap).toString()) networkMapConfig = config log.info("Network map provided by: ${config.legalName}") } diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt index cf9822f969..71566e98a4 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/views/NodeTabView.kt @@ -220,7 +220,7 @@ class NodeTabView : Fragment() { imageview { image = flags.get()[it.countryCode] } - label(it.description) + label(it.description!!) alignment = Pos.CENTER_LEFT } } diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt index 6fee64384a..e73cbef9c7 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt @@ -82,7 +82,7 @@ class NodeConfigTest { @Test fun `test services`() { - val config = createConfig(services = listOf("my.service")) + val config = createConfig(services = mutableListOf("my.service")) assertEquals(listOf("my.service"), config.extraServices) } @@ -107,13 +107,13 @@ class NodeConfigTest { @Test fun `test cash issuer`() { - val config = createConfig(services = listOf("corda.issuer.GBP")) + val config = createConfig(services = mutableListOf("corda.issuer.GBP")) assertTrue(config.isCashIssuer) } @Test fun `test not cash issuer`() { - val config = createConfig(services = listOf("corda.issuerubbish")) + val config = createConfig(services = mutableListOf("corda.issuerubbish")) assertFalse(config.isCashIssuer) } @@ -138,7 +138,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) assertEquals(prettyPrint("{" @@ -164,7 +164,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) @@ -193,7 +193,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) @@ -223,7 +223,7 @@ class NodeConfigTest { rpcPort = 40002, webPort = 20001, h2Port = 30001, - services = listOf("my.service"), + services = mutableListOf("my.service"), users = listOf(user("jenny")) ) config.networkMap = NetworkMapConfig(DUMMY_NOTARY.name, 12345) @@ -257,7 +257,7 @@ class NodeConfigTest { rpcPort: Int = -1, webPort: Int = -1, h2Port: Int = -1, - services: List = listOf("extra.service"), + services: MutableList = mutableListOf("extra.service"), users: List = listOf(user("guest")) ) = NodeConfig( baseDir, diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt index d476ccc93a..40cee11f4a 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt @@ -170,7 +170,7 @@ class NodeControllerTest { rpcPort: Int = -1, webPort: Int = -1, h2Port: Int = -1, - services: List = listOf("extra.service"), + services: MutableList = mutableListOf("extra.service"), users: List = listOf(user("guest")) ) = NodeConfig( baseDir, diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt index 23dcb222cf..2c33129de9 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/Network.kt @@ -27,10 +27,13 @@ import javafx.util.Duration import net.corda.client.jfx.model.* import net.corda.client.jfx.utils.* import net.corda.core.contracts.ContractState +import net.corda.core.crypto.locationOrNull import net.corda.core.crypto.toBase58String import net.corda.core.identity.Party +import net.corda.core.node.CityDatabase import net.corda.core.node.NodeInfo import net.corda.core.node.ScreenCoordinate +import net.corda.core.node.WorldMapLocation import net.corda.explorer.formatters.PartyNameFormatter import net.corda.explorer.model.CordaView import tornadofx.* @@ -99,7 +102,7 @@ class Network : CordaView() { copyableLabel(SimpleObjectProperty(node.legalIdentity.owningKey.toBase58String())).apply { minWidth = 400.0 } } row("Services :") { label(node.advertisedServices.map { it.info }.joinToString(", ")) } - node.worldMapLocation?.apply { row("Location :") { label(this@apply.description) } } + node.getWorldMapLocation()?.apply { row("Location :") { label(this@apply.description!!) } } } } setOnMouseClicked { @@ -123,7 +126,7 @@ class Network : CordaView() { contentDisplay = ContentDisplay.TOP val coordinate = Bindings.createObjectBinding({ // These coordinates are obtained when we generate the map using TileMill. - node.worldMapLocation?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0) + node.getWorldMapLocation()?.coordinate?.project(mapPane.width, mapPane.height, 85.0511, -85.0511, -180.0, 180.0) ?: ScreenCoordinate(0.0, 0.0) }, arrayOf(mapPane.widthProperty(), mapPane.heightProperty())) // Center point of the label. layoutXProperty().bind(coordinate.map { it.screenX - width / 2 }) diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt index 254b8aacb2..7e70be42b0 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt @@ -185,7 +185,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List