Various cleanup of the network map code (#2604)

This commit is contained in:
Shams Asari 2018-02-23 09:13:00 +00:00 committed by GitHub
parent e91b74962b
commit 2af0feee04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 232 additions and 208 deletions

View File

@ -147,6 +147,8 @@ inline fun <R> Path.readLines(charset: Charset = UTF_8, block: (Stream<String>)
fun Path.readAllLines(charset: Charset = UTF_8): List<String> = Files.readAllLines(this, charset) fun Path.readAllLines(charset: Charset = UTF_8): List<String> = Files.readAllLines(this, charset)
fun Path.writeLines(lines: Iterable<CharSequence>, charset: Charset = UTF_8, vararg options: OpenOption): Path = Files.write(this, lines, charset, *options) fun Path.writeLines(lines: Iterable<CharSequence>, charset: Charset = UTF_8, vararg options: OpenOption): Path = Files.write(this, lines, charset, *options)
inline fun <reified T : Any> Path.readObject(): T = readAll().deserialize()
fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options) fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options)
fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + "" fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + ""
@ -377,15 +379,14 @@ inline fun <T : Any> SerializedBytes<T>.sign(signer: (SerializedBytes<T>) -> Dig
return SignedData(this, signer(this)) return SignedData(this, signer(this))
} }
inline fun <T : Any> SerializedBytes<T>.sign(keyPair: KeyPair): SignedData<T> { fun <T : Any> SerializedBytes<T>.sign(keyPair: KeyPair): SignedData<T> = SignedData(this, keyPair.sign(this.bytes))
return SignedData(this, keyPair.sign(this.bytes))
}
fun ByteBuffer.copyBytes() = ByteArray(remaining()).also { get(it) } fun ByteBuffer.copyBytes(): ByteArray = ByteArray(remaining()).also { get(it) }
fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoader: ClassLoader, config: CordappConfig): CordappContext { fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoader: ClassLoader, config: CordappConfig): CordappContext {
return CordappContext(cordapp, attachmentId, classLoader, config) return CordappContext(cordapp, attachmentId, classLoader, config)
} }
/** Verifies that the correct notarisation request was signed by the counterparty. */ /** Verifies that the correct notarisation request was signed by the counterparty. */
fun NotaryFlow.Service.validateRequest(request: NotarisationRequest, signature: NotarisationRequestSignature) { fun NotaryFlow.Service.validateRequest(request: NotarisationRequest, signature: NotarisationRequestSignature) {
val requestingParty = otherSideSession.counterparty val requestingParty = otherSideSession.counterparty

View File

@ -143,8 +143,9 @@ class WireTransaction(componentGroups: List<ComponentGroup>, val privacySalt: Pr
remainingTransactionSize -= size remainingTransactionSize -= size
} }
// Check attachment size first as they are most likely to go over the limit. // Check attachments size first as they are most likely to go over the limit. With ContractAttachment instances
ltx.attachments.associateBy(Attachment::id).values.forEach { minus(it.size) } // it's likely that the same underlying Attachment CorDapp will occur more than once so we dedup on the attachment id.
ltx.attachments.distinctBy { it.id }.forEach { minus(it.size) }
minus(ltx.inputs.serialize().size) minus(ltx.inputs.serialize().size)
minus(ltx.commands.serialize().size) minus(ltx.commands.serialize().size)
minus(ltx.outputs.serialize().size) minus(ltx.outputs.serialize().size)

View File

@ -15,7 +15,6 @@ import java.security.SignatureException
* A signed [NodeInfo] object containing a signature for each identity. The list of signatures is expected * A signed [NodeInfo] object containing a signature for each identity. The list of signatures is expected
* to be in the same order as the identities. * to be in the same order as the identities.
*/ */
// TODO Move this to net.corda.nodeapi.internal.network
// TODO Add signatures for composite keys. The current thinking is to make sure there is a signature for each leaf key // TODO Add signatures for composite keys. The current thinking is to make sure there is a signature for each leaf key
// that the node owns. This check can only be done by the network map server as it can check with the doorman if a node // that the node owns. This check can only be done by the network map server as it can check with the doorman if a node
// is part of a composite identity. This of course further requires the doorman being able to issue CSRs for composite // is part of a composite identity. This of course further requires the doorman being able to issue CSRs for composite

View File

@ -9,14 +9,13 @@ import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.NotaryInfo import net.corda.core.node.NotaryInfo
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.deserialize
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal._contextSerializationEnv import net.corda.core.serialization.internal._contextSerializationEnv
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
@ -150,7 +149,7 @@ class NetworkBootstrapper {
if (nodeConfig.hasPath("notary")) { if (nodeConfig.hasPath("notary")) {
val validating = nodeConfig.getConfig("notary").getBoolean("validating") val validating = nodeConfig.getConfig("notary").getBoolean("validating")
// And the node-info file contains the notary's identity // And the node-info file contains the notary's identity
val nodeInfo = nodeInfoFile.readAll().deserialize<SignedNodeInfo>().verified() val nodeInfo = nodeInfoFile.readObject<SignedNodeInfo>().verified()
NotaryInfo(nodeInfo.notaryIdentity(), validating) NotaryInfo(nodeInfo.notaryIdentity(), validating)
} else { } else {
null null

View File

@ -14,6 +14,9 @@ import java.time.Instant
const val NETWORK_PARAMS_FILE_NAME = "network-parameters" const val NETWORK_PARAMS_FILE_NAME = "network-parameters"
const val NETWORK_PARAMS_UPDATE_FILE_NAME = "network-parameters-update" const val NETWORK_PARAMS_UPDATE_FILE_NAME = "network-parameters-update"
typealias SignedNetworkMap = SignedDataWithCert<NetworkMap>
typealias SignedNetworkParameters = SignedDataWithCert<NetworkParameters>
/** /**
* Data structure representing the network map available from the HTTP network map service as a serialised blob. * Data structure representing the network map available from the HTTP network map service as a serialised blob.
* @property nodeInfoHashes list of network participant's [NodeInfo] hashes * @property nodeInfoHashes list of network participant's [NodeInfo] hashes

View File

@ -2,18 +2,20 @@ package net.corda.node.services.network
import net.corda.cordform.CordformNode import net.corda.cordform.CordformNode
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.list
import net.corda.core.internal.readObject
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.core.node.NetworkParameters
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.PortAllocation import net.corda.testing.driver.PortAllocation
import net.corda.testing.node.internal.CompatibilityZoneParams import net.corda.testing.node.internal.CompatibilityZoneParams
@ -64,8 +66,7 @@ class NetworkMapTest {
) { ) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val networkParameters = (alice.baseDirectory / NETWORK_PARAMS_FILE_NAME) val networkParameters = (alice.baseDirectory / NETWORK_PARAMS_FILE_NAME)
.readAll() .readObject<SignedNetworkParameters>()
.deserialize<SignedDataWithCert<NetworkParameters>>()
.verified() .verified()
// We use a random modified time above to make the network parameters unqiue so that we're sure they came // We use a random modified time above to make the network parameters unqiue so that we're sure they came
// from the server // from the server

View File

@ -52,7 +52,7 @@ internal class CordaRPCOpsImpl(
} }
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> { override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
return services.networkMapUpdater.track() return services.networkMapUpdater.trackParametersUpdate()
} }
override fun acceptNewNetworkParameters(parametersHash: SecureHash) { override fun acceptNewNetworkParameters(parametersHash: SecureHash) {

View File

@ -3,12 +3,12 @@ package net.corda.node.internal
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.node.services.network.NetworkMapClient import net.corda.node.services.network.NetworkMapClient
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.StandardCopyOption import java.nio.file.StandardCopyOption
@ -26,9 +26,9 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
val networkParameters by lazy { retrieveNetworkParameters() } val networkParameters by lazy { retrieveNetworkParameters() }
private fun retrieveNetworkParameters(): NetworkParameters { private fun retrieveNetworkParameters(): NetworkParameters {
val advertisedParametersHash = networkMapClient?.getNetworkMap()?.networkMap?.networkParameterHash val advertisedParametersHash = networkMapClient?.getNetworkMap()?.payload?.networkParameterHash
val signedParametersFromFile = if (networkParamsFile.exists()) { val signedParametersFromFile = if (networkParamsFile.exists()) {
networkParamsFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>() networkParamsFile.readObject<SignedNetworkParameters>()
} else { } else {
null null
} }
@ -51,13 +51,13 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
return parameters return parameters
} }
private fun readParametersUpdate(advertisedParametersHash: SecureHash, previousParametersHash: SecureHash): SignedDataWithCert<NetworkParameters> { private fun readParametersUpdate(advertisedParametersHash: SecureHash, previousParametersHash: SecureHash): SignedNetworkParameters {
if (!parametersUpdateFile.exists()) { if (!parametersUpdateFile.exists()) {
throw IllegalArgumentException("Node uses parameters with hash: $previousParametersHash " + throw IllegalArgumentException("Node uses parameters with hash: $previousParametersHash " +
"but network map is advertising: ${advertisedParametersHash}.\n" + "but network map is advertising: $advertisedParametersHash.\n" +
"Please update node to use correct network parameters file.") "Please update node to use correct network parameters file.")
} }
val signedUpdatedParameters = parametersUpdateFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>() val signedUpdatedParameters = parametersUpdateFile.readObject<SignedNetworkParameters>()
if (signedUpdatedParameters.raw.hash != advertisedParametersHash) { if (signedUpdatedParameters.raw.hash != advertisedParametersHash) {
throw IllegalArgumentException("Both network parameters and network parameters update files don't match" + throw IllegalArgumentException("Both network parameters and network parameters update files don't match" +
"parameters advertised by network map.\n" + "parameters advertised by network map.\n" +

View File

@ -1,43 +1,27 @@
package net.corda.node.services.network package net.corda.node.services.network
import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData import net.corda.core.crypto.SignedData
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory
import net.corda.node.utilities.registration.cacheControl import net.corda.node.utilities.registration.cacheControl
import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME import net.corda.nodeapi.internal.network.*
import net.corda.nodeapi.internal.network.NetworkMap
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import rx.Subscription
import rx.subjects.PublishSubject
import java.io.BufferedReader import java.io.BufferedReader
import java.io.Closeable
import java.net.URL import java.net.URL
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.time.Duration import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certificate) { class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certificate) {
companion object { companion object {
private val logger = contextLogger() private val logger = contextLogger()
} }
private val networkMapUrl = URL("$compatibilityZoneURL/network-map") private val networkMapUrl = URL("$compatibilityZoneURL/network-map")
fun publish(signedNodeInfo: SignedNodeInfo) { fun publish(signedNodeInfo: SignedNodeInfo) {
@ -57,10 +41,13 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
fun getNetworkMap(): NetworkMapResponse { fun getNetworkMap(): NetworkMapResponse {
logger.trace { "Fetching network map update from $networkMapUrl." } logger.trace { "Fetching network map update from $networkMapUrl." }
val connection = networkMapUrl.openHttpConnection() val connection = networkMapUrl.openHttpConnection()
val signedNetworkMap = connection.responseAs<SignedDataWithCert<NetworkMap>>() val signedNetworkMap = connection.responseAs<SignedNetworkMap>()
val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustedRoot) val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustedRoot)
val timeout = connection.cacheControl().maxAgeSeconds().seconds val timeout = connection.cacheControl().maxAgeSeconds().seconds
logger.trace { "Fetched network map update from $networkMapUrl successfully, retrieved ${networkMap.nodeInfoHashes.size} node info hashes. Node Info hashes: ${networkMap.nodeInfoHashes.joinToString("\n")}" } logger.trace {
"Fetched network map update from $networkMapUrl successfully, retrieved ${networkMap.nodeInfoHashes.size} " +
"node info hashes. Node Info hashes:\n${networkMap.nodeInfoHashes.joinToString("\n")}"
}
return NetworkMapResponse(networkMap, timeout) return NetworkMapResponse(networkMap, timeout)
} }
@ -72,10 +59,10 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
return verifiedNodeInfo return verifiedNodeInfo
} }
fun getNetworkParameters(networkParameterHash: SecureHash): SignedDataWithCert<NetworkParameters> { fun getNetworkParameters(networkParameterHash: SecureHash): SignedNetworkParameters {
val url = URL("$networkMapUrl/network-parameters/$networkParameterHash") val url = URL("$networkMapUrl/network-parameters/$networkParameterHash")
logger.trace { "Fetching network parameters: '$networkParameterHash' from $url." } logger.trace { "Fetching network parameters: '$networkParameterHash' from $url." }
val networkParameter = url.openHttpConnection().responseAs<SignedDataWithCert<NetworkParameters>>() val networkParameter = url.openHttpConnection().responseAs<SignedNetworkParameters>()
logger.trace { "Fetched network parameters: '$networkParameterHash' successfully. Network Parameters: $networkParameter" } logger.trace { "Fetched network parameters: '$networkParameterHash' successfully. Network Parameters: $networkParameter" }
return networkParameter return networkParameter
} }
@ -89,143 +76,4 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
} }
} }
data class NetworkMapResponse(val networkMap: NetworkMap, val cacheMaxAge: Duration) data class NetworkMapResponse(val payload: NetworkMap, val cacheMaxAge: Duration)
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
private val networkMapClient: NetworkMapClient?,
private val currentParametersHash: SecureHash,
private val baseDirectory: Path) : Closeable {
companion object {
private val logger = contextLogger()
private val retryInterval = 1.minutes
}
private var newNetworkParameters: Pair<ParametersUpdate, SignedDataWithCert<NetworkParameters>>? = null
fun track(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
val currentUpdateInfo = newNetworkParameters?.let {
ParametersUpdateInfo(it.first.newParametersHash, it.second.verified(), it.first.description, it.first.updateDeadline)
}
return DataFeed(
currentUpdateInfo,
parametersUpdatesTrack
)
}
private val parametersUpdatesTrack: PublishSubject<ParametersUpdateInfo> = PublishSubject.create<ParametersUpdateInfo>()
private val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
private var fileWatcherSubscription: Subscription? = null
override fun close() {
fileWatcherSubscription?.unsubscribe()
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
}
fun updateNodeInfo(newInfo: NodeInfo, signNodeInfo: (NodeInfo) -> SignedNodeInfo) {
val oldInfo = networkMapCache.getNodeByLegalIdentity(newInfo.legalIdentities.first())
// Compare node info without timestamp.
if (newInfo.copy(serial = 0L) == oldInfo?.copy(serial = 0L)) return
// Only publish and write to disk if there are changes to the node info.
val signedNodeInfo = signNodeInfo(newInfo)
networkMapCache.addNode(newInfo)
fileWatcher.saveToFile(signedNodeInfo)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(signedNodeInfo, networkMapClient)
}
}
fun subscribeToNetworkMap() {
require(fileWatcherSubscription == null) { "Should not call this method twice." }
// Subscribe to file based networkMap
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode)
if (networkMapClient == null) return
// Subscribe to remote network map if configured.
val task = object : Runnable {
override fun run() {
val nextScheduleDelay = try {
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(it) }
if (currentParametersHash != networkMap.networkParameterHash) {
// TODO This needs special handling (node omitted update process/didn't accept new parameters or didn't restart on updateDeadline)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is advertising: ${networkMap.networkParameterHash}.\n" +
"Please update node to use correct network parameters file.\"")
System.exit(1)
}
val currentNodeHashes = networkMapCache.allNodeHashes
val hashesFromNetworkMap = networkMap.nodeInfoHashes
(hashesFromNetworkMap - currentNodeHashes).mapNotNull {
// Download new node info from network map
try {
networkMapClient.getNodeInfo(it)
} catch (e: Exception) {
// Failure to retrieve one node info shouldn't stop the whole update, log and return null instead.
logger.warn("Error encountered when downloading node info '$it', skipping...", e)
null
}
}.forEach {
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
networkMapCache.addNode(it)
}
// Remove node info from network map.
(currentNodeHashes - hashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
.mapNotNull(networkMapCache::getNodeByHash)
.forEach(networkMapCache::removeNode)
cacheTimeout
} catch (t: Throwable) {
logger.warn("Error encountered while updating network map, will retry in ${retryInterval.seconds} seconds", t)
retryInterval
}
// Schedule the next update.
executor.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
}
}
executor.submit(task) // The check may be expensive, so always run it in the background even the first time.
}
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
val task = object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
logger.warn("Error encountered while publishing node info, will retry in ${retryInterval.seconds} seconds.", t)
// TODO: Exponential backoff?
executor.schedule(this, retryInterval.toMillis(), TimeUnit.MILLISECONDS)
}
}
}
executor.submit(task)
}
private fun handleUpdateNetworkParameters(update: ParametersUpdate) {
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) { // This update was handled already.
return
}
val newParameters = networkMapClient?.getNetworkParameters(update.newParametersHash)
if (newParameters != null) {
logger.info("Downloaded new network parameters: $newParameters from the update: $update")
newNetworkParameters = Pair(update, newParameters)
parametersUpdatesTrack.onNext(ParametersUpdateInfo(update.newParametersHash, newParameters.verifiedNetworkMapCert(networkMapClient!!.trustedRoot), update.description, update.updateDeadline))
}
}
fun acceptNewNetworkParameters(parametersHash: SecureHash, sign: (SecureHash) -> SignedData<SecureHash>) {
networkMapClient ?: throw IllegalStateException("Network parameters updates are not support without compatibility zone configured")
// TODO This scenario will happen if node was restarted and didn't download parameters yet, but we accepted them. Add persisting of newest parameters from update.
val (_, newParams) = newNetworkParameters ?: throw IllegalArgumentException("Couldn't find parameters update for the hash: $parametersHash")
val newParametersHash = newParams.verifiedNetworkMapCert(networkMapClient.trustedRoot).serialize().hash // We should check that we sign the right data structure hash.
if (parametersHash == newParametersHash) {
// The latest parameters have priority.
newParams.serialize()
.open()
.copyTo(baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME, StandardCopyOption.REPLACE_EXISTING)
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
} else {
throw IllegalArgumentException("Refused to accept parameters with hash $parametersHash because network map advertises update with hash $newParametersHash. Please check newest version")
}
}
}

View File

@ -0,0 +1,178 @@
package net.corda.node.services.network
import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.internal.copyTo
import net.corda.core.internal.div
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.minutes
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import rx.Subscription
import rx.subjects.PublishSubject
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
private val networkMapClient: NetworkMapClient?,
private val currentParametersHash: SecureHash,
private val baseDirectory: Path
) : AutoCloseable {
companion object {
private val logger = contextLogger()
private val defaultRetryInterval = 1.minutes
}
private val parametersUpdatesTrack: PublishSubject<ParametersUpdateInfo> = PublishSubject.create<ParametersUpdateInfo>()
private val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
private var fileWatcherSubscription: Subscription? = null
override fun close() {
fileWatcherSubscription?.unsubscribe()
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
}
fun trackParametersUpdate(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
val currentUpdateInfo = newNetworkParameters?.let {
ParametersUpdateInfo(it.first.newParametersHash, it.second.verified(), it.first.description, it.first.updateDeadline)
}
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
}
fun updateNodeInfo(newInfo: NodeInfo, signer: (NodeInfo) -> SignedNodeInfo) {
val oldInfo = networkMapCache.getNodeByLegalIdentity(newInfo.legalIdentities.first())
// Compare node info without timestamp.
if (newInfo.copy(serial = 0L) == oldInfo?.copy(serial = 0L)) return
// Only publish and write to disk if there are changes to the node info.
val signedNodeInfo = signer(newInfo)
networkMapCache.addNode(newInfo)
fileWatcher.saveToFile(signedNodeInfo)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(signedNodeInfo, networkMapClient)
}
}
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
executor.submit(object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
// TODO: Exponential backoff?
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
}
}
})
}
fun subscribeToNetworkMap() {
require(fileWatcherSubscription == null) { "Should not call this method twice." }
// Subscribe to file based networkMap
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode)
if (networkMapClient == null) return
// Subscribe to remote network map if configured.
executor.submit(object : Runnable {
override fun run() {
val nextScheduleDelay = try {
updateNetworkMapCache(networkMapClient)
} catch (t: Throwable) {
logger.warn("Error encountered while updating network map, will retry in $defaultRetryInterval", t)
defaultRetryInterval
}
// Schedule the next update.
executor.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
}
}) // The check may be expensive, so always run it in the background even the first time.
}
private fun updateNetworkMapCache(networkMapClient: NetworkMapClient): Duration {
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
if (currentParametersHash != networkMap.networkParameterHash) {
// TODO This needs special handling (node omitted update process/didn't accept new parameters or didn't restart on updateDeadline)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
"advertising: ${networkMap.networkParameterHash}.\n" +
"Please update node to use correct network parameters file.\"")
System.exit(1)
}
val currentNodeHashes = networkMapCache.allNodeHashes
val hashesFromNetworkMap = networkMap.nodeInfoHashes
(hashesFromNetworkMap - currentNodeHashes).mapNotNull {
// Download new node info from network map
try {
networkMapClient.getNodeInfo(it)
} catch (e: Exception) {
// Failure to retrieve one node info shouldn't stop the whole update, log and return null instead.
logger.warn("Error encountered when downloading node info '$it', skipping...", e)
null
}
}.forEach {
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
networkMapCache.addNode(it)
}
// Remove node info from network map.
(currentNodeHashes - hashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
.mapNotNull(networkMapCache::getNodeByHash)
.forEach(networkMapCache::removeNode)
return cacheTimeout
}
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
// This update was handled already.
return
}
val newParameters = networkMapClient.getNetworkParameters(update.newParametersHash)
logger.info("Downloaded new network parameters: $newParameters from the update: $update")
newNetworkParameters = Pair(update, newParameters)
val updateInfo = ParametersUpdateInfo(
update.newParametersHash,
newParameters.verifiedNetworkMapCert(networkMapClient.trustedRoot),
update.description,
update.updateDeadline)
parametersUpdatesTrack.onNext(updateInfo)
}
fun acceptNewNetworkParameters(parametersHash: SecureHash, sign: (SecureHash) -> SignedData<SecureHash>) {
networkMapClient ?: throw IllegalStateException("Network parameters updates are not support without compatibility zone configured")
// TODO This scenario will happen if node was restarted and didn't download parameters yet, but we accepted them.
// Add persisting of newest parameters from update.
val (_, newParams) = requireNotNull(newNetworkParameters) { "Couldn't find parameters update for the hash: $parametersHash" }
// We should check that we sign the right data structure hash.
val newParametersHash = newParams.verifiedNetworkMapCert(networkMapClient.trustedRoot).serialize().hash
if (parametersHash == newParametersHash) {
// The latest parameters have priority.
newParams.serialize()
.open()
.copyTo(baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME, StandardCopyOption.REPLACE_EXISTING)
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
} else {
throw IllegalArgumentException("Refused to accept parameters with hash $parametersHash because network map " +
"advertises update with hash $newParametersHash. Please check newest version")
}
}
}

View File

@ -4,12 +4,11 @@ import net.corda.cordform.CordformNode
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import rx.Observable import rx.Observable
import rx.Scheduler import rx.Scheduler
import java.io.IOException import java.io.IOException
@ -118,7 +117,7 @@ class NodeInfoWatcher(private val nodePath: Path,
private fun processFile(file: Path): NodeInfo? { private fun processFile(file: Path): NodeInfo? {
return try { return try {
logger.info("Reading NodeInfo from file: $file") logger.info("Reading NodeInfo from file: $file")
val signedData = file.readAll().deserialize<SignedNodeInfo>() val signedData = file.readObject<SignedNodeInfo>()
signedData.verified() signedData.verified()
} catch (e: Exception) { } catch (e: Exception) {
logger.warn("Exception parsing NodeInfo from file. $file", e) logger.warn("Exception parsing NodeInfo from file. $file", e)

View File

@ -57,7 +57,7 @@ class NetworkMapClientTest {
val nodeInfoHash = nodeInfo.serialize().sha256() val nodeInfoHash = nodeInfo.serialize().sha256()
assertThat(networkMapClient.getNetworkMap().networkMap.nodeInfoHashes).containsExactly(nodeInfoHash) assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash)
assertEquals(nodeInfo, networkMapClient.getNodeInfo(nodeInfoHash)) assertEquals(nodeInfo, networkMapClient.getNodeInfo(nodeInfoHash))
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned(BOB_NAME) val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned(BOB_NAME)
@ -65,7 +65,7 @@ class NetworkMapClientTest {
networkMapClient.publish(signedNodeInfo2) networkMapClient.publish(signedNodeInfo2)
val nodeInfoHash2 = nodeInfo2.serialize().sha256() val nodeInfoHash2 = nodeInfo2.serialize().sha256()
assertThat(networkMapClient.getNetworkMap().networkMap.nodeInfoHashes).containsExactly(nodeInfoHash, nodeInfoHash2) assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash, nodeInfoHash2)
assertEquals(cacheTimeout, networkMapClient.getNetworkMap().cacheMaxAge) assertEquals(cacheTimeout, networkMapClient.getNetworkMap().cacheMaxAge)
assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2)) assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2))
} }

View File

@ -15,17 +15,13 @@ import net.corda.core.internal.*
import net.corda.core.messaging.ParametersUpdateInfo import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.millis import net.corda.core.utilities.millis
import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME import net.corda.nodeapi.internal.network.*
import net.corda.nodeapi.internal.network.NetworkMap
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.* import net.corda.testing.core.*
import net.corda.testing.internal.DEV_ROOT_CA import net.corda.testing.internal.DEV_ROOT_CA
@ -197,7 +193,7 @@ class NetworkMapUpdaterTest {
@Test @Test
fun `emit new parameters update info on parameters update from network map`() { fun `emit new parameters update info on parameters update from network map`() {
val paramsFeed = updater.track() val paramsFeed = updater.trackParametersUpdate()
val snapshot = paramsFeed.snapshot val snapshot = paramsFeed.snapshot
val updates = paramsFeed.updates.bufferUntilSubscribed() val updates = paramsFeed.updates.bufferUntilSubscribed()
assertEquals(null, snapshot) assertEquals(null, snapshot)
@ -229,7 +225,7 @@ class NetworkMapUpdaterTest {
updater.acceptNewNetworkParameters(newHash, { hash -> hash.serialize().sign(keyPair)}) updater.acceptNewNetworkParameters(newHash, { hash -> hash.serialize().sign(keyPair)})
verify(networkMapClient).ackNetworkParametersUpdate(any()) verify(networkMapClient).ackNetworkParametersUpdate(any())
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
val signedNetworkParams = updateFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>() val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkMapCert(DEV_ROOT_CA.certificate) val paramsFromFile = signedNetworkParams.verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
assertEquals(newParameters, paramsFromFile) assertEquals(newParameters, paramsFromFile)
} }

View File

@ -2,15 +2,13 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import net.corda.core.internal.* import net.corda.core.internal.createDirectories
import net.corda.core.node.NetworkParameters import net.corda.core.internal.div
import net.corda.core.serialization.deserialize import net.corda.core.internal.exists
import net.corda.core.internal.readObject
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.internal.NetworkParametersReader import net.corda.node.internal.NetworkParametersReader
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME import net.corda.nodeapi.internal.network.*
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation import net.corda.testing.driver.PortAllocation
@ -57,7 +55,9 @@ class NetworkParametersReaderTest {
assertFalse((baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME).exists()) assertFalse((baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME).exists())
assertEquals(server.networkParameters, parameters) assertEquals(server.networkParameters, parameters)
// Parameters from update should be moved to `network-parameters` file. // Parameters from update should be moved to `network-parameters` file.
val parametersFromFile = (baseDirectory / NETWORK_PARAMS_FILE_NAME).readAll().deserialize<SignedDataWithCert<NetworkParameters>>().verifiedNetworkMapCert(DEV_ROOT_CA.certificate) val parametersFromFile = (baseDirectory / NETWORK_PARAMS_FILE_NAME)
.readObject<SignedNetworkParameters>()
.verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
assertEquals(server.networkParameters, parametersFromFile) assertEquals(server.networkParameters, parametersFromFile)
} }
} }

View File

@ -18,7 +18,6 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.serialization.deserialize
import net.corda.core.toFuture import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
@ -484,7 +483,7 @@ class DriverDSLImpl(
val nodeInfoFile = config.corda.baseDirectory.list { paths -> val nodeInfoFile = config.corda.baseDirectory.list { paths ->
paths.filter { it.fileName.toString().startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get() paths.filter { it.fileName.toString().startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
} }
val nodeInfo = nodeInfoFile.readAll().deserialize<SignedNodeInfo>().verified() val nodeInfo = nodeInfoFile.readObject<SignedNodeInfo>().verified()
NotaryInfo(nodeInfo.legalIdentities[0], spec.validating) NotaryInfo(nodeInfo.legalIdentities[0], spec.validating)
} }
} }