mirror of
https://github.com/corda/corda.git
synced 2025-06-17 22:58:19 +00:00
First approach to network parameters updates (#2412)
* Network parameters updates Add two RPC methods networkParametersFeed and acceptNewNetworkParameters. Implementation of client handling of network parameters update event. Partial implementation of accepting new parameters and installing them on the node as well as node startup with updated parameters. Move reading of network parameters on startup to separate NetworkParametersReader class. Add tests. Move NetworkParameters and NotaryInfo classes to core. * Ignore evolvability test - to be fixed later * Add documentation on update process
This commit is contained in:
committed by
GitHub
parent
cbe947694d
commit
6acff3a7df
@ -25,7 +25,7 @@ import net.corda.node.services.transactions.minClusterSize
|
||||
import net.corda.node.services.transactions.minCorrectReplicas
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.nodeapi.internal.network.NotaryInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.testing.core.chooseIdentity
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
|
@ -8,8 +8,8 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkParameters
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
@ -45,7 +45,7 @@ class NetworkMapTest {
|
||||
networkMapServer = NetworkMapServer(cacheTimeout, portAllocation.nextHostAndPort())
|
||||
val address = networkMapServer.start()
|
||||
compatibilityZone = CompatibilityZoneParams(URL("http://$address"), publishNotaries = {
|
||||
networkMapServer.networkParameters = testNetworkParameters(it, modifiedTime = Instant.ofEpochMilli(random63BitValue()))
|
||||
networkMapServer.networkParameters = testNetworkParameters(it, modifiedTime = Instant.ofEpochMilli(random63BitValue()), epoch = 2)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -8,8 +8,6 @@ import net.corda.confidential.SwapIdentitiesHandler
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -54,14 +52,11 @@ import net.corda.node.services.vault.VaultSoftLockManager
|
||||
import net.corda.node.shell.InteractiveShell
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkParameters
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||
import net.corda.nodeapi.internal.sign
|
||||
import net.corda.nodeapi.internal.storeLegalIdentity
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
|
||||
@ -137,7 +132,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
private val _nodeReadyFuture = openFuture<Unit>()
|
||||
protected var networkMapClient: NetworkMapClient? = null
|
||||
|
||||
protected lateinit var networkMapUpdater: NetworkMapUpdater
|
||||
lateinit var securityManager: RPCSecurityManager
|
||||
|
||||
/** Completes once the node has successfully registered with the network map service
|
||||
@ -165,14 +160,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
validateKeystore()
|
||||
}
|
||||
|
||||
private inline fun signNodeInfo(nodeInfo: NodeInfo, sign: (PublicKey, SerializedBytes<NodeInfo>) -> DigitalSignature): SignedNodeInfo {
|
||||
// For now we exclude any composite identities, see [SignedNodeInfo]
|
||||
val owningKeys = nodeInfo.legalIdentities.map { it.owningKey }.filter { it !is CompositeKey }
|
||||
val serialised = nodeInfo.serialize()
|
||||
val signatures = owningKeys.map { sign(it, serialised) }
|
||||
return SignedNodeInfo(serialised, signatures)
|
||||
}
|
||||
|
||||
open fun generateAndSaveNodeInfo(): NodeInfo {
|
||||
check(started == null) { "Node has already been started" }
|
||||
log.info("Generating nodeInfo ...")
|
||||
@ -185,7 +172,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
persistentNetworkMapCache.start()
|
||||
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val signedNodeInfo = signNodeInfo(info) { publicKey, serialised ->
|
||||
val signedNodeInfo = info.sign { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
@ -202,7 +189,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
val identityService = makeIdentityService(identity.certificate)
|
||||
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
|
||||
retrieveNetworkParameters(identityService.trustRoot)
|
||||
networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
|
||||
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
"Node's platform version is lower than network's required minimumPlatformVersion"
|
||||
}
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
|
||||
@ -241,14 +231,15 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
startShell(rpcOps)
|
||||
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
|
||||
}
|
||||
val networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
networkMapClient,
|
||||
networkParameters.serialize().hash)
|
||||
networkParameters.serialize().hash,
|
||||
configuration.baseDirectory)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
networkMapUpdater.updateNodeInfo(services.myInfo) {
|
||||
signNodeInfo(it) { publicKey, serialised ->
|
||||
it.sign { publicKey, serialised ->
|
||||
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
|
||||
}
|
||||
}
|
||||
@ -633,29 +624,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
return PersistentKeyManagementService(identityService, keyPairs)
|
||||
}
|
||||
|
||||
private fun retrieveNetworkParameters(trustRoot: X509Certificate) {
|
||||
val networkParamsFile = configuration.baseDirectory / NETWORK_PARAMS_FILE_NAME
|
||||
|
||||
networkParameters = if (networkParamsFile.exists()) {
|
||||
networkParamsFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>().verifiedNetworkMapCert(trustRoot)
|
||||
} else {
|
||||
log.info("No network-parameters file found. Expecting network parameters to be available from the network map.")
|
||||
val networkMapClient = checkNotNull(networkMapClient) {
|
||||
"Node hasn't been configured to connect to a network map from which to get the network parameters"
|
||||
}
|
||||
val (networkMap, _) = networkMapClient.getNetworkMap()
|
||||
val signedParams = networkMapClient.getNetworkParameters(networkMap.networkParameterHash)
|
||||
val verifiedParams = signedParams.verifiedNetworkMapCert(trustRoot)
|
||||
signedParams.serialize().open().copyTo(configuration.baseDirectory / NETWORK_PARAMS_FILE_NAME)
|
||||
verifiedParams
|
||||
}
|
||||
|
||||
log.info("Loaded network parameters: $networkParameters")
|
||||
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
"Node's platform version is lower than network's required minimumPlatformVersion"
|
||||
}
|
||||
}
|
||||
|
||||
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService {
|
||||
val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service")
|
||||
return notaryConfig.run {
|
||||
@ -785,6 +753,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
override val networkService: MessagingService get() = network
|
||||
override val clock: Clock get() = platformClock
|
||||
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
|
||||
override val networkMapUpdater: NetworkMapUpdater get() = this@AbstractNode.networkMapUpdater
|
||||
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
|
||||
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
|
||||
return cordappServices.getInstance(type) ?: throw IllegalArgumentException("Corda service ${type.name} does not exist")
|
||||
|
@ -13,12 +13,14 @@ import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.sign
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
@ -48,6 +50,18 @@ internal class CordaRPCOpsImpl(
|
||||
return snapshot
|
||||
}
|
||||
|
||||
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
|
||||
return services.networkMapUpdater.track()
|
||||
}
|
||||
|
||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) {
|
||||
services.networkMapUpdater.acceptNewNetworkParameters(
|
||||
parametersHash,
|
||||
// TODO When multiple identities design will be better specified this should be signature from node operator.
|
||||
{ hash -> hash.serialize().sign { services.keyManagementService.sign(it.bytes, services.myInfo.legalIdentities[0].owningKey) } }
|
||||
)
|
||||
}
|
||||
|
||||
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
|
||||
return database.transaction {
|
||||
services.networkMapCache.track()
|
||||
|
@ -0,0 +1,81 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.network.NetworkMapClient
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.security.cert.X509Certificate
|
||||
|
||||
class NetworkParametersReader(private val trustRoot: X509Certificate,
|
||||
private val networkMapClient: NetworkMapClient?,
|
||||
private val baseDirectory: Path) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
private val networkParamsFile = baseDirectory / NETWORK_PARAMS_FILE_NAME
|
||||
private val parametersUpdateFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val networkParameters by lazy { retrieveNetworkParameters() }
|
||||
|
||||
private fun retrieveNetworkParameters(): NetworkParameters {
|
||||
val advertisedParametersHash = networkMapClient?.getNetworkMap()?.networkMap?.networkParameterHash
|
||||
val signedParametersFromFile = if (networkParamsFile.exists()) {
|
||||
networkParamsFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>()
|
||||
} else {
|
||||
null
|
||||
}
|
||||
val parameters = if (advertisedParametersHash != null) {
|
||||
// TODO On one hand we have node starting without parameters and just accepting them by default,
|
||||
// on the other we have parameters update process - it needs to be unified. Say you start the node, you don't have matching parameters,
|
||||
// you get them from network map, but you have to run the approval step.
|
||||
if (signedParametersFromFile == null) { // Node joins for the first time.
|
||||
downloadParameters(trustRoot, advertisedParametersHash)
|
||||
}
|
||||
else if (signedParametersFromFile.raw.hash == advertisedParametersHash) { // Restarted with the same parameters.
|
||||
signedParametersFromFile.verifiedNetworkMapCert(trustRoot)
|
||||
} else { // Update case.
|
||||
readParametersUpdate(advertisedParametersHash, signedParametersFromFile.raw.hash).verifiedNetworkMapCert(trustRoot)
|
||||
}
|
||||
} else { // No compatibility zone configured. Node should proceed with parameters from file.
|
||||
signedParametersFromFile?.verifiedNetworkMapCert(trustRoot) ?: throw IllegalArgumentException("Couldn't find network parameters file and compatibility zone wasn't configured")
|
||||
}
|
||||
logger.info("Loaded network parameters: $parameters")
|
||||
return parameters
|
||||
}
|
||||
|
||||
private fun readParametersUpdate(advertisedParametersHash: SecureHash, previousParametersHash: SecureHash): SignedDataWithCert<NetworkParameters> {
|
||||
if (!parametersUpdateFile.exists()) {
|
||||
throw IllegalArgumentException("Node uses parameters with hash: $previousParametersHash " +
|
||||
"but network map is advertising: ${advertisedParametersHash}.\n" +
|
||||
"Please update node to use correct network parameters file.")
|
||||
}
|
||||
val signedUpdatedParameters = parametersUpdateFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>()
|
||||
if (signedUpdatedParameters.raw.hash != advertisedParametersHash) {
|
||||
throw IllegalArgumentException("Both network parameters and network parameters update files don't match" +
|
||||
"parameters advertised by network map.\n" +
|
||||
"Please update node to use correct network parameters file.")
|
||||
}
|
||||
parametersUpdateFile.moveTo(networkParamsFile, StandardCopyOption.REPLACE_EXISTING)
|
||||
return signedUpdatedParameters
|
||||
}
|
||||
|
||||
// Used only when node joins for the first time.
|
||||
private fun downloadParameters(trustRoot: X509Certificate, parametersHash: SecureHash): NetworkParameters {
|
||||
logger.info("No network-parameters file found. Expecting network parameters to be available from the network map.")
|
||||
val networkMapClient = checkNotNull(networkMapClient) {
|
||||
"Node hasn't been configured to connect to a network map from which to get the network parameters"
|
||||
}
|
||||
val signedParams = networkMapClient.getNetworkParameters(parametersHash)
|
||||
val verifiedParams = signedParams.verifiedNetworkMapCert(trustRoot)
|
||||
signedParams.serialize().open().copyTo(baseDirectory / NETWORK_PARAMS_FILE_NAME)
|
||||
return verifiedParams
|
||||
}
|
||||
}
|
@ -9,6 +9,7 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
@ -20,6 +21,13 @@ import java.security.PublicKey
|
||||
|
||||
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
|
||||
class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val context: () -> RpcAuthContext) : CordaRPCOps {
|
||||
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> = guard("networkParametersFeed") {
|
||||
implementation.networkParametersFeed()
|
||||
}
|
||||
|
||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) = guard("acceptNewNetworkParameters") {
|
||||
implementation.acceptNewNetworkParameters(parametersHash)
|
||||
}
|
||||
|
||||
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash = guard("uploadAttachmentWithMetadata") {
|
||||
implementation.uploadAttachmentWithMetadata(jar, uploader, filename)
|
||||
|
@ -20,6 +20,7 @@ import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.internal.cordapp.CordappProviderInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.network.NetworkMapUpdater
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
|
||||
@ -64,6 +65,7 @@ interface ServiceHubInternal : ServiceHub {
|
||||
val networkService: MessagingService
|
||||
val database: CordaPersistence
|
||||
val configuration: NodeConfiguration
|
||||
val networkMapUpdater: NetworkMapUpdater
|
||||
override val cordappProvider: CordappProviderInternal
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
require(txs.any()) { "No transactions passed in for recording" }
|
||||
|
@ -2,11 +2,13 @@ package net.corda.node.services.network
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.SignedDataWithCert
|
||||
import net.corda.core.internal.checkOkResponse
|
||||
import net.corda.core.internal.openHttpConnection
|
||||
import net.corda.core.internal.responseAs
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.minutes
|
||||
@ -16,40 +18,42 @@ import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.node.utilities.registration.cacheControl
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkMap
|
||||
import net.corda.nodeapi.internal.network.NetworkParameters
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import okhttp3.CacheControl
|
||||
import okhttp3.Headers
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.BufferedReader
|
||||
import java.io.Closeable
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.security.cert.X509Certificate
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class NetworkMapClient(compatibilityZoneURL: URL, private val trustedRoot: X509Certificate) {
|
||||
class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certificate) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
private val networkMapUrl = URL("$compatibilityZoneURL/network-map")
|
||||
|
||||
fun publish(signedNodeInfo: SignedNodeInfo) {
|
||||
val publishURL = URL("$networkMapUrl/publish")
|
||||
logger.trace { "Publishing NodeInfo to $publishURL." }
|
||||
publishURL.openHttpConnection().apply {
|
||||
doOutput = true
|
||||
requestMethod = "POST"
|
||||
setRequestProperty("Content-Type", "application/octet-stream")
|
||||
outputStream.use { signedNodeInfo.serialize().open().copyTo(it) }
|
||||
checkOkResponse()
|
||||
}
|
||||
publishURL.post(signedNodeInfo.serialize())
|
||||
logger.trace { "Published NodeInfo to $publishURL successfully." }
|
||||
}
|
||||
|
||||
fun ackNetworkParametersUpdate(signedParametersHash: SignedData<SecureHash>) {
|
||||
val ackURL = URL("$networkMapUrl/ack-parameters")
|
||||
logger.trace { "Sending network parameters with hash ${signedParametersHash.raw.deserialize()} approval to $ackURL." }
|
||||
ackURL.post(signedParametersHash.serialize())
|
||||
logger.trace { "Sent network parameters approval to $ackURL successfully." }
|
||||
}
|
||||
|
||||
fun getNetworkMap(): NetworkMapResponse {
|
||||
logger.trace { "Fetching network map update from $networkMapUrl." }
|
||||
val connection = networkMapUrl.openHttpConnection()
|
||||
@ -90,12 +94,26 @@ data class NetworkMapResponse(val networkMap: NetworkMap, val cacheMaxAge: Durat
|
||||
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val fileWatcher: NodeInfoWatcher,
|
||||
private val networkMapClient: NetworkMapClient?,
|
||||
private val currentParametersHash: SecureHash) : Closeable {
|
||||
private val currentParametersHash: SecureHash,
|
||||
private val baseDirectory: Path) : Closeable {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
private val retryInterval = 1.minutes
|
||||
}
|
||||
|
||||
private var newNetworkParameters: Pair<ParametersUpdate, SignedDataWithCert<NetworkParameters>>? = null
|
||||
|
||||
fun track(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
|
||||
val currentUpdateInfo = newNetworkParameters?.let {
|
||||
ParametersUpdateInfo(it.first.newParametersHash, it.second.verified(), it.first.description, it.first.updateDeadline)
|
||||
}
|
||||
return DataFeed(
|
||||
currentUpdateInfo,
|
||||
parametersUpdatesTrack
|
||||
)
|
||||
}
|
||||
|
||||
private val parametersUpdatesTrack: PublishSubject<ParametersUpdateInfo> = PublishSubject.create<ParametersUpdateInfo>()
|
||||
private val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
|
||||
private var fileWatcherSubscription: Subscription? = null
|
||||
|
||||
@ -130,8 +148,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
override fun run() {
|
||||
val nextScheduleDelay = try {
|
||||
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
|
||||
// TODO NetworkParameters updates are not implemented yet. Every mismatch should result in node shutdown.
|
||||
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(it) }
|
||||
if (currentParametersHash != networkMap.networkParameterHash) {
|
||||
// TODO This needs special handling (node omitted update process/didn't accept new parameters or didn't restart on updateDeadline)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Please update node to use correct network parameters file.\"")
|
||||
System.exit(1)
|
||||
@ -181,4 +200,32 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
}
|
||||
executor.submit(task)
|
||||
}
|
||||
|
||||
private fun handleUpdateNetworkParameters(update: ParametersUpdate) {
|
||||
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) { // This update was handled already.
|
||||
return
|
||||
}
|
||||
val newParameters = networkMapClient?.getNetworkParameters(update.newParametersHash)
|
||||
if (newParameters != null) {
|
||||
logger.info("Downloaded new network parameters: $newParameters from the update: $update")
|
||||
newNetworkParameters = Pair(update, newParameters)
|
||||
parametersUpdatesTrack.onNext(ParametersUpdateInfo(update.newParametersHash, newParameters.verifiedNetworkMapCert(networkMapClient!!.trustedRoot), update.description, update.updateDeadline))
|
||||
}
|
||||
}
|
||||
|
||||
fun acceptNewNetworkParameters(parametersHash: SecureHash, sign: (SecureHash) -> SignedData<SecureHash>) {
|
||||
networkMapClient ?: throw IllegalStateException("Network parameters updates are not support without compatibility zone configured")
|
||||
// TODO This scenario will happen if node was restarted and didn't download parameters yet, but we accepted them. Add persisting of newest parameters from update.
|
||||
val (_, newParams) = newNetworkParameters ?: throw IllegalArgumentException("Couldn't find parameters update for the hash: $parametersHash")
|
||||
val newParametersHash = newParams.verifiedNetworkMapCert(networkMapClient.trustedRoot).serialize().hash // We should check that we sign the right data structure hash.
|
||||
if (parametersHash == newParametersHash) {
|
||||
// The latest parameters have priority.
|
||||
newParams.serialize()
|
||||
.open()
|
||||
.copyTo(baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME, StandardCopyOption.REPLACE_EXISTING)
|
||||
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
|
||||
} else {
|
||||
throw IllegalArgumentException("Refused to accept parameters with hash $parametersHash because network map advertises update with hash $newParametersHash. Please check newest version")
|
||||
}
|
||||
}
|
||||
}
|
@ -7,6 +7,7 @@ import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.schemas.NodeInfoSchemaV1
|
||||
@ -23,7 +24,6 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.NetworkMapCacheBaseInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NonInvalidatingCache
|
||||
import net.corda.nodeapi.internal.network.NotaryInfo
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||
|
@ -7,9 +7,9 @@ import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.nodeapi.internal.network.NetworkParameters
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.nodeapi.internal.network.NotaryInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
|
@ -1,12 +1,15 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.sha256
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DEV_ROOT_CA
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.TestNodeInfoBuilder
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
@ -20,6 +23,8 @@ import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.io.IOException
|
||||
import java.net.URL
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NetworkMapClientTest {
|
||||
@ -90,4 +95,23 @@ class NetworkMapClientTest {
|
||||
fun `get hostname string from http response correctly`() {
|
||||
assertEquals("test.host.name", networkMapClient.myPublicHostname())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `handle parameters update`() {
|
||||
val nextParameters = testNetworkParameters(emptyList(), epoch = 2)
|
||||
val originalNetworkParameterHash = server.networkParameters.serialize().hash
|
||||
val nextNetworkParameterHash = nextParameters.serialize().hash
|
||||
val description = "Test parameters"
|
||||
server.scheduleParametersUpdate(nextParameters, description, Instant.now().plus(1, ChronoUnit.DAYS))
|
||||
val (networkMap) = networkMapClient.getNetworkMap()
|
||||
assertEquals(networkMap.networkParameterHash, originalNetworkParameterHash)
|
||||
assertEquals(networkMap.parametersUpdate?.description, description)
|
||||
assertEquals(networkMap.parametersUpdate?.newParametersHash, nextNetworkParameterHash)
|
||||
assertEquals(networkMapClient.getNetworkParameters(originalNetworkParameterHash).verified(), server.networkParameters)
|
||||
assertEquals(networkMapClient.getNetworkParameters(nextNetworkParameterHash).verified(), nextParameters)
|
||||
val keyPair = Crypto.generateKeyPair()
|
||||
val signedHash = nextNetworkParameterHash.serialize().sign(keyPair)
|
||||
networkMapClient.ackNetworkParametersUpdate(signedHash)
|
||||
assertEquals(nextNetworkParameterHash, server.latestParametersAccepted(keyPair.public))
|
||||
}
|
||||
}
|
||||
|
@ -7,19 +7,27 @@ import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.times
|
||||
import com.nhaarman.mockito_kotlin.verify
|
||||
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkMap
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.TestNodeInfoBuilder
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
@ -27,8 +35,11 @@ import org.junit.After
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import rx.schedulers.TestScheduler
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NetworkMapUpdaterTest {
|
||||
@Rule
|
||||
@ -39,13 +50,16 @@ class NetworkMapUpdaterTest {
|
||||
private val baseDir = fs.getPath("/node")
|
||||
private val networkMapCache = createMockNetworkMapCache()
|
||||
private val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedNodeInfo>()
|
||||
private val networkParamsMap = HashMap<SecureHash, NetworkParameters>()
|
||||
private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa()
|
||||
private val cacheExpiryMs = 100
|
||||
private val networkMapClient = createMockNetworkMapClient()
|
||||
private val scheduler = TestScheduler()
|
||||
private val networkParametersHash = SecureHash.randomSHA256()
|
||||
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash)
|
||||
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir)
|
||||
private val nodeInfoBuilder = TestNodeInfoBuilder()
|
||||
private var parametersUpdate: ParametersUpdate? = null
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
@ -180,18 +194,73 @@ class NetworkMapUpdaterTest {
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfo.serialize().hash)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `emit new parameters update info on parameters update from network map`() {
|
||||
val paramsFeed = updater.track()
|
||||
val snapshot = paramsFeed.snapshot
|
||||
val updates = paramsFeed.updates.bufferUntilSubscribed()
|
||||
assertEquals(null, snapshot)
|
||||
val newParameters = testNetworkParameters(emptyList(), epoch = 2)
|
||||
val updateDeadline = Instant.now().plus(1, ChronoUnit.DAYS)
|
||||
scheduleParametersUpdate(newParameters, "Test update", updateDeadline)
|
||||
updater.subscribeToNetworkMap()
|
||||
updates.expectEvents(isStrict = false) {
|
||||
sequence(
|
||||
expect { update: ParametersUpdateInfo ->
|
||||
assertThat(update.updateDeadline == updateDeadline)
|
||||
assertThat(update.description == "Test update")
|
||||
assertThat(update.hash == newParameters.serialize().hash)
|
||||
assertThat(update.parameters == newParameters)
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `ack network parameters update`() {
|
||||
val newParameters = testNetworkParameters(emptyList(), epoch = 314)
|
||||
scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
|
||||
updater.subscribeToNetworkMap()
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
val newHash = newParameters.serialize().hash
|
||||
val keyPair = Crypto.generateKeyPair()
|
||||
updater.acceptNewNetworkParameters(newHash, { hash -> hash.serialize().sign(keyPair)})
|
||||
verify(networkMapClient).ackNetworkParametersUpdate(any())
|
||||
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val signedNetworkParams = updateFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>()
|
||||
val paramsFromFile = signedNetworkParams.verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
|
||||
assertEquals(newParameters, paramsFromFile)
|
||||
}
|
||||
|
||||
private fun scheduleParametersUpdate(nextParameters: NetworkParameters, description: String, updateDeadline: Instant) {
|
||||
val nextParamsHash = nextParameters.serialize().hash
|
||||
networkParamsMap[nextParamsHash] = nextParameters
|
||||
parametersUpdate = ParametersUpdate(nextParamsHash, description, updateDeadline)
|
||||
}
|
||||
|
||||
private fun createMockNetworkMapClient(): NetworkMapClient {
|
||||
return mock {
|
||||
on { trustedRoot }.then {
|
||||
DEV_ROOT_CA.certificate
|
||||
}
|
||||
on { publish(any()) }.then {
|
||||
val signedNodeInfo: SignedNodeInfo = uncheckedCast(it.arguments[0])
|
||||
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
|
||||
}
|
||||
on { getNetworkMap() }.then {
|
||||
NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), networkParametersHash), cacheExpiryMs.millis)
|
||||
NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), networkParametersHash, parametersUpdate), cacheExpiryMs.millis)
|
||||
}
|
||||
on { getNodeInfo(any()) }.then {
|
||||
nodeInfoMap[it.arguments[0]]?.verified()
|
||||
}
|
||||
on { getNetworkParameters(any()) }.then {
|
||||
val paramsHash: SecureHash = uncheckedCast(it.arguments[0])
|
||||
networkParamsMap[paramsHash]?.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate)
|
||||
}
|
||||
on { ackNetworkParametersUpdate(any()) }.then {
|
||||
Unit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,63 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import com.google.common.jimfs.Configuration
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.NetworkParametersReader
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.DEV_ROOT_CA
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.node.internal.network.NetworkMapServer
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.net.URL
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFalse
|
||||
|
||||
class NetworkParametersReaderTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
|
||||
private val cacheTimeout = 100000.seconds
|
||||
|
||||
private lateinit var server: NetworkMapServer
|
||||
private lateinit var networkMapClient: NetworkMapClient
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
server = NetworkMapServer(cacheTimeout, PortAllocation.Incremental(10000).nextHostAndPort())
|
||||
val hostAndPort = server.start()
|
||||
networkMapClient = NetworkMapClient(URL("http://${hostAndPort.host}:${hostAndPort.port}"), DEV_ROOT_CA.certificate)
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
server.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `read correct set of parameters from file`() {
|
||||
val fs = Jimfs.newFileSystem(Configuration.unix())
|
||||
val baseDirectory = fs.getPath("/node").createDirectories()
|
||||
val oldParameters = testNetworkParameters(emptyList(), epoch = 1)
|
||||
NetworkParametersCopier(oldParameters).install(baseDirectory)
|
||||
NetworkParametersCopier(server.networkParameters, update = true).install(baseDirectory) // Parameters update file.
|
||||
val parameters = NetworkParametersReader(DEV_ROOT_CA.certificate, networkMapClient, baseDirectory).networkParameters
|
||||
assertFalse((baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME).exists())
|
||||
assertEquals(server.networkParameters, parameters)
|
||||
// Parameters from update should be moved to `network-parameters` file.
|
||||
val parametersFromFile = (baseDirectory / NETWORK_PARAMS_FILE_NAME).readAll().deserialize<SignedDataWithCert<NetworkParameters>>().verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
|
||||
assertEquals(server.networkParameters, parametersFromFile)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user