ENT-2489: Introduce NetworkMapUpdater into FlowWorkerServiceHub (#1396)

Without this change RpcWorkerTest currently in `master` just hangs
as FlowWorker never deemed to be ready from NM point of view.
This commit is contained in:
Viktor Kolomeyko 2018-09-18 15:16:20 +01:00 committed by GitHub
parent 2f9763ed47
commit 18d4013d0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 10 deletions

View File

@ -53,6 +53,7 @@ class FlowWorkerTest {
private val portAllocation = PortAllocation.Incremental(10000)
// TODO: Convert to Signed Network Parameters and pass into FlowWorkerServiceHub constructor.
private val networkParameters = NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(),
@ -227,7 +228,7 @@ class FlowWorkerTest {
}
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networkParameters: NetworkParameters, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, networkParameters, ourKeyPair, trustRoot, nodeCa)
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, ourKeyPair, trustRoot, nodeCa, TODO())
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
flowWorker.start()
return Pair(flowWorker, flowWorkerServiceHub)

View File

@ -12,6 +12,7 @@ import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.sign
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.VisibleForTesting
@ -49,7 +50,9 @@ import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.network.NetworkMapClient
import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.*
import net.corda.node.services.schema.NodeSchemaService
@ -58,16 +61,19 @@ import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.isH2Database
import net.corda.serialization.internal.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger
import rx.Observable
import rx.schedulers.Schedulers
import java.security.KeyPair
import java.security.cert.X509Certificate
import java.sql.Connection
import java.time.Clock
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
@ -75,7 +81,11 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.reflect.KClass
class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo, override val networkParameters: NetworkParameters, private val ourKeyPair: KeyPair, private val trustRoot: X509Certificate, private val nodeCa: X509Certificate) : ServiceHubInternal, SingletonSerializeAsToken() {
class FlowWorkerServiceHub(override val configuration: NodeConfiguration, override val myInfo: NodeInfo,
private val ourKeyPair: KeyPair, private val trustRoot: X509Certificate, private val nodeCa: X509Certificate,
private val signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned) : ServiceHubInternal, SingletonSerializeAsToken() {
override val networkParameters: NetworkParameters = signedNetworkParameters.networkParameters
override val clock: CordaClock = SimpleClock(Clock.systemUTC())
private val versionInfo = getVersionInfo()
@ -135,10 +145,19 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
override val monitoringService = MonitoringService(metricRegistry).tokenize()
override val networkMapUpdater: NetworkMapUpdater
get() {
throw NotImplementedError()
}
private val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) }
override val networkMapUpdater= NetworkMapUpdater(
networkMapCache,
NodeInfoWatcher(
configuration.baseDirectory,
@Suppress("LeakingThis")
Schedulers.io(),
Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)
),
networkMapClient,
configuration.baseDirectory,
configuration.extraNetworkMapKeys
).closeOnStop()
private val transactionVerifierWorkerCount = 4
@Suppress("LeakingThis")
@ -376,6 +395,11 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
identityService.ourNames = myInfo.legalIdentities.map { it.name }.toSet()
startMessagingService()
val nodeInfoAndSigned = NodeInfoAndSigned(myInfo) { _, serialised ->
ourKeyPair.private.sign(serialised.bytes)
}
networkMapUpdater.start(trustRoot, signedNetworkParameters.signed.raw.hash, nodeInfoAndSigned.signed.raw.hash)
database.transaction {
identityService.loadIdentities(myInfo.legalIdentitiesAndCerts)
attachments.start()

View File

@ -16,7 +16,6 @@ import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.createDirectory
import net.corda.core.internal.div
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
@ -89,7 +88,7 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize)
flowWorkerConfigs.map {
val (flowWorker, _) = createFlowWorker(it, myInfo, signedNetworkParameters.networkParameters, ourKeyPair, trustRoot, nodeCa)
val (flowWorker, _) = createFlowWorker(it, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
shutdownManager.registerShutdown { flowWorker.stop() }
}
@ -187,8 +186,8 @@ private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: In
return broker
}
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networkParameters: NetworkParameters, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, networkParameters, ourKeyPair, trustRoot, nodeCa)
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate, signedNetworkParameters: NetworkParametersReader.NetworkParametersAndSigned): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
flowWorker.start()
return Pair(flowWorker, flowWorkerServiceHub)