mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
CORDA-1048 - Making it simpler to move an existing local deployment of nodes to across different machines (#2697)
* Various cleanup of the network map code (#2604) (cherry picked from commit2af0fee
) * CORDA-1048: Making it simpler to move an existing local deployment of nodes to across different machines. (#2672) This was achieved by having the hash in the node-info file to be just of the node's X.500 name. This also solves existing duplicate node-info file issues that we've been having. Also updated the docsite. (cherry picked from commit8616f24
)
This commit is contained in:
parent
b76556940a
commit
bf712a893e
@ -142,6 +142,8 @@ inline fun <R> Path.readLines(charset: Charset = UTF_8, block: (Stream<String>)
|
||||
fun Path.readAllLines(charset: Charset = UTF_8): List<String> = Files.readAllLines(this, charset)
|
||||
fun Path.writeLines(lines: Iterable<CharSequence>, charset: Charset = UTF_8, vararg options: OpenOption): Path = Files.write(this, lines, charset, *options)
|
||||
|
||||
inline fun <reified T : Any> Path.readObject(): T = readAll().deserialize()
|
||||
|
||||
fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options)
|
||||
|
||||
fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + "…"
|
||||
@ -372,9 +374,7 @@ inline fun <T : Any> SerializedBytes<T>.sign(signer: (SerializedBytes<T>) -> Dig
|
||||
return SignedData(this, signer(this))
|
||||
}
|
||||
|
||||
inline fun <T : Any> SerializedBytes<T>.sign(keyPair: KeyPair): SignedData<T> {
|
||||
return SignedData(this, keyPair.sign(this.bytes))
|
||||
}
|
||||
fun <T : Any> SerializedBytes<T>.sign(keyPair: KeyPair): SignedData<T> = SignedData(this, keyPair.sign(this.bytes))
|
||||
|
||||
/** Verifies that the correct notarisation request was signed by the counterparty. */
|
||||
fun NotaryFlow.Service.validateRequest(request: NotarisationRequest, signature: NotarisationRequestSignature) {
|
||||
@ -391,4 +391,4 @@ fun NotarisationRequest.generateSignature(serviceHub: ServiceHub): NotarisationR
|
||||
keyManagementService.sign(serializedRequest, myLegalIdentity)
|
||||
}
|
||||
return NotarisationRequestSignature(signature, serviceHub.myInfo.platformVersion)
|
||||
}
|
||||
}
|
||||
|
@ -139,8 +139,9 @@ class WireTransaction(componentGroups: List<ComponentGroup>, val privacySalt: Pr
|
||||
remainingTransactionSize -= size
|
||||
}
|
||||
|
||||
// Check attachment size first as they are most likely to go over the limit.
|
||||
ltx.attachments.forEach { minus(it.size) }
|
||||
// Check attachments size first as they are most likely to go over the limit. With ContractAttachment instances
|
||||
// it's likely that the same underlying Attachment CorDapp will occur more than once so we dedup on the attachment id.
|
||||
ltx.attachments.distinctBy { it.id }.forEach { minus(it.size) }
|
||||
minus(ltx.inputs.serialize().size)
|
||||
minus(ltx.commands.serialize().size)
|
||||
minus(ltx.outputs.serialize().size)
|
||||
|
@ -88,7 +88,7 @@ path to the node's base directory.
|
||||
:p2pAddress: The host and port on which the node is available for protocol operations over ArtemisMQ.
|
||||
|
||||
.. note:: In practice the ArtemisMQ messaging services bind to all local addresses on the specified port. However,
|
||||
note that the host is the included as the advertised entry in the NetworkMapService. As a result the value listed
|
||||
note that the host is the included as the advertised entry in the network map. As a result the value listed
|
||||
here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable,
|
||||
the node will try to auto-discover its public one.
|
||||
|
||||
|
@ -1,8 +1,6 @@
|
||||
Creating nodes locally
|
||||
======================
|
||||
|
||||
.. contents::
|
||||
|
||||
Node structure
|
||||
--------------
|
||||
Each Corda node has the following structure:
|
||||
@ -70,8 +68,8 @@ The name must also obey the following constraints:
|
||||
The Cordform task
|
||||
-----------------
|
||||
Corda provides a gradle plugin called ``Cordform`` that allows you to automatically generate and configure a set of
|
||||
nodes. Here is an example ``Cordform`` task called ``deployNodes`` that creates three nodes, defined in the
|
||||
`Kotlin CorDapp Template <https://github.com/corda/cordapp-template-kotlin/blob/release-V3/build.gradle#L100>`_:
|
||||
nodes for testing and demos. Here is an example ``Cordform`` task called ``deployNodes`` that creates three nodes, defined
|
||||
in the `Kotlin CorDapp Template <https://github.com/corda/cordapp-template-kotlin/blob/release-V3/build.gradle#L100>`_:
|
||||
|
||||
.. sourcecode:: groovy
|
||||
|
||||
@ -165,8 +163,7 @@ a single node to run the network map service, by putting its name in the ``netwo
|
||||
.. warning:: When adding nodes, make sure that there are no port clashes!
|
||||
|
||||
The Dockerform task
|
||||
-------------------
|
||||
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
The ```Dockerform``` is a sister task of ```Cordform```. It has nearly the same syntax and produces very
|
||||
similar results - enhanced by an extra file to enable easy spin up of nodes using ```docker-compose```.
|
||||
Below you can find the example task from the ```IRS Demo<https://github.com/corda/corda/blob/release-V3.0/samples/irs-demo/cordapp/build.gradle#L111>```
|
||||
@ -230,7 +227,7 @@ by default exposes port 10003 which is the default one for RPC connections.
|
||||
|
||||
|
||||
Running deployNodes
|
||||
-------------------
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
To create the nodes defined in our ``deployNodes`` task, run the following command in a terminal window from the root
|
||||
of the project where the ``deployNodes`` task is defined:
|
||||
|
||||
|
@ -21,7 +21,7 @@ service.
|
||||
task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar']) {
|
||||
directory "./build/nodes"
|
||||
node {
|
||||
name "O=NetworkMapAndNotary,L=London,C=GB"
|
||||
name "O=Notary,L=London,C=GB"
|
||||
notary = [validating : true]
|
||||
p2pPort 10002
|
||||
rpcPort 10003
|
||||
@ -142,7 +142,7 @@ The vaults of PartyA and PartyB should both display the following output:
|
||||
- "C=GB,L=London,O=PartyA"
|
||||
- "C=US,L=New York,O=PartyB"
|
||||
contract: "com.template.contract.IOUContract"
|
||||
notary: "C=GB,L=London,O=NetworkMapAndNotary,CN=corda.notary.validating"
|
||||
notary: "C=GB,L=London,O=Notary"
|
||||
encumbrance: null
|
||||
constraint:
|
||||
attachmentId: "F578320232CAB87BB1E919F3E5DB9D81B7346F9D7EA6D9155DC0F7BA8E472552"
|
||||
@ -157,7 +157,7 @@ The vaults of PartyA and PartyB should both display the following output:
|
||||
recordedTime: 1506415268.875000000
|
||||
consumedTime: null
|
||||
status: "UNCONSUMED"
|
||||
notary: "C=GB,L=London,O=NetworkMapAndNotary,CN=corda.notary.validating"
|
||||
notary: "C=GB,L=London,O=Notary"
|
||||
lockId: null
|
||||
lockUpdateTime: 1506415269.548000000
|
||||
totalStatesAvailable: -1
|
||||
|
@ -57,9 +57,9 @@ in its local network map cache. The node generates its own node-info file on sta
|
||||
In addition to the network map, all the nodes on a network must use the same set of network parameters. These are a set
|
||||
of constants which guarantee interoperability between nodes. The HTTP network map distributes the network parameters
|
||||
which the node downloads automatically. In the absence of this the network parameters must be generated locally. This can
|
||||
be done with the network bootstrapper. This a tool that scans all the node configurations from a common directory to
|
||||
be done with the network bootstrapper. This is a tool that scans all the node configurations from a common directory to
|
||||
generate the network parameters file which is copied to the nodes' directories. It also copies each node's node-info file
|
||||
to every other node.
|
||||
to every other node so that they can all transact with each other.
|
||||
|
||||
The bootstrapper tool can be built with the command:
|
||||
|
||||
@ -82,6 +82,12 @@ For example running the command on a directory containing these files :
|
||||
|
||||
Would generate directories containing three nodes: notary, partya and partyb.
|
||||
|
||||
This tool only bootstraps a network. It cannot dynamically update if a new node needs to join the network or if an existing
|
||||
one has changed something in their node-info, e.g. their P2P address. For this the new node-info file will need to be placed
|
||||
in the other nodes' ``additional-node-infos`` directory. A simple way to do this is to use `rsync <https://en.wikipedia.org/wiki/Rsync>`_.
|
||||
However, if it's known beforehand the set of nodes that will eventually the node folders can be pregenerated in the bootstrap
|
||||
and only started when needed.
|
||||
|
||||
Starting the nodes
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -17,7 +17,7 @@ if:
|
||||
|
||||
We will deploy the CorDapp on 4 test nodes:
|
||||
|
||||
* **NetworkMapAndNotary**, which hosts a validating notary service
|
||||
* **Notary**, which hosts a validating notary service
|
||||
* **PartyA**
|
||||
* **PartyB**
|
||||
* **PartyC**
|
||||
@ -252,7 +252,7 @@ For each node, the ``runnodes`` script creates a node tab/window:
|
||||
|
||||
Fri Jul 07 10:33:47 BST 2017>>>
|
||||
|
||||
For every node except the network map/notary, the script also creates a webserver terminal tab/window:
|
||||
For every node except the notary, the script also creates a webserver terminal tab/window:
|
||||
|
||||
.. sourcecode:: none
|
||||
|
||||
@ -471,23 +471,27 @@ For more information on the client RPC interface and how to build an RPC client
|
||||
|
||||
Running Nodes Across Machines
|
||||
-----------------------------
|
||||
The nodes can be split across machines and configured to communicate across the network.
|
||||
The nodes can be split across different machines and configured to communicate across the network.
|
||||
|
||||
After deploying the nodes, navigate to the build folder (``kotlin-source/build/nodes``) and move some of the individual
|
||||
node folders to a different machine (e.g. using a USB key). It is important that none of the nodes - including the
|
||||
network map/notary node - end up on more than one machine. Each computer should also have a copy of ``runnodes`` and
|
||||
``runnodes.bat``.
|
||||
After deploying the nodes, navigate to the build folder (``kotlin-source/build/nodes``) and for each node that needs to
|
||||
be moved to another machine open its config file and change the Artemis messaging address to the IP address of the machine
|
||||
where the node will run (e.g. ``p2pAddress="10.18.0.166:10006"``).
|
||||
|
||||
These changes require new node-info files to be distributed amongst the nodes. Use the network bootstrapper tool
|
||||
(see :doc:`setting-up-a-corda-network` for more information on this and how to built it) to update the files and have
|
||||
them distributed locally.
|
||||
|
||||
``java -jar network-bootstrapper.jar kotlin-source/build/nodes``
|
||||
|
||||
Once that's done move the node folders to their designated machines (e.g. using a USB key). It is important that none of the
|
||||
nodes - including the notary - end up on more than one machine. Each computer should also have a copy of ``runnodes``
|
||||
and ``runnodes.bat``.
|
||||
|
||||
For example, you may end up with the following layout:
|
||||
|
||||
* Machine 1: ``NetworkMapAndNotary``, ``PartyA``, ``runnodes``, ``runnodes.bat``
|
||||
* Machine 1: ``Notary``, ``PartyA``, ``runnodes``, ``runnodes.bat``
|
||||
* Machine 2: ``PartyB``, ``PartyC``, ``runnodes``, ``runnodes.bat``
|
||||
|
||||
You must now edit the configuration file for each node, including the network map/notary. Open each node's config file,
|
||||
and make the following changes:
|
||||
|
||||
* Change the Artemis messaging address to the machine's IP address (e.g. ``p2pAddress="10.18.0.166:10006"``)
|
||||
|
||||
After starting each node, the nodes will be able to see one another and agree IOUs among themselves.
|
||||
|
||||
Debugging your CorDapp
|
||||
|
@ -15,7 +15,6 @@ import java.security.SignatureException
|
||||
* A signed [NodeInfo] object containing a signature for each identity. The list of signatures is expected
|
||||
* to be in the same order as the identities.
|
||||
*/
|
||||
// TODO Move this to net.corda.nodeapi.internal.network
|
||||
// TODO Add signatures for composite keys. The current thinking is to make sure there is a signature for each leaf key
|
||||
// that the node owns. This check can only be done by the network map server as it can check with the doorman if a node
|
||||
// is part of a composite identity. This of course further requires the doorman being able to issue CSRs for composite
|
||||
@ -54,3 +53,13 @@ inline fun NodeInfo.sign(signer: (PublicKey, SerializedBytes<NodeInfo>) -> Digit
|
||||
val signatures = owningKeys.map { signer(it, serialised) }
|
||||
return SignedNodeInfo(serialised, signatures)
|
||||
}
|
||||
|
||||
/**
|
||||
* A container for a [SignedNodeInfo] and its cached [NodeInfo].
|
||||
*/
|
||||
class NodeInfoAndSigned private constructor(val nodeInfo: NodeInfo, val signed: SignedNodeInfo) {
|
||||
constructor(nodeInfo: NodeInfo, signer: (PublicKey, SerializedBytes<NodeInfo>) -> DigitalSignature) : this(nodeInfo, nodeInfo.sign(signer))
|
||||
constructor(signedNodeInfo: SignedNodeInfo) : this(signedNodeInfo.verified(), signedNodeInfo)
|
||||
operator fun component1(): NodeInfo = nodeInfo
|
||||
operator fun component2(): SignedNodeInfo = signed
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import com.google.common.hash.Hashing
|
||||
import com.google.common.hash.HashingInputStream
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.contracts.ContractClassName
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SecureHash.Companion.parse
|
||||
import net.corda.core.identity.Party
|
||||
@ -14,7 +15,6 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal._contextSerializationEnv
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
@ -80,9 +80,9 @@ class NetworkBootstrapper {
|
||||
distributeNodeInfos(nodeDirs, nodeInfoFiles)
|
||||
println("Gathering notary identities")
|
||||
val notaryInfos = gatherNotaryInfos(nodeInfoFiles)
|
||||
println("Notary identities to be used in network-parameters file: ${notaryInfos.joinToString("; ") { it.prettyPrint() }}")
|
||||
println("Notary identities to be used in network parameters: ${notaryInfos.joinToString("; ") { it.prettyPrint() }}")
|
||||
val mergedWhiteList = generateWhitelist(directory / WHITELIST_FILE_NAME, directory / EXCLUDE_WHITELIST_FILE_NAME, cordapps)
|
||||
println("Updating whitelist.")
|
||||
println("Updating whitelist")
|
||||
overwriteWhitelist(directory / WHITELIST_FILE_NAME, mergedWhiteList)
|
||||
installNetworkParameters(notaryInfos, nodeDirs, mergedWhiteList)
|
||||
println("Bootstrapping complete!")
|
||||
@ -164,7 +164,7 @@ class NetworkBootstrapper {
|
||||
if (nodeConfig.hasPath("notary")) {
|
||||
val validating = nodeConfig.getConfig("notary").getBoolean("validating")
|
||||
// And the node-info file contains the notary's identity
|
||||
val nodeInfo = nodeInfoFile.readAll().deserialize<SignedNodeInfo>().verified()
|
||||
val nodeInfo = nodeInfoFile.readObject<SignedNodeInfo>().verified()
|
||||
NotaryInfo(nodeInfo.notaryIdentity(), validating)
|
||||
} else {
|
||||
null
|
||||
@ -189,19 +189,21 @@ class NetworkBootstrapper {
|
||||
|
||||
private fun generateWhitelist(whitelistFile: Path, excludeWhitelistFile: Path, cordapps: List<String>?): Map<String, List<AttachmentId>> {
|
||||
val existingWhitelist = if (whitelistFile.exists()) readContractWhitelist(whitelistFile) else emptyMap()
|
||||
println("Found existing whitelist: $existingWhitelist")
|
||||
println("Found existing whitelist:")
|
||||
existingWhitelist.forEach { println(it.outputString()) }
|
||||
|
||||
val excludeContracts = if (excludeWhitelistFile.exists()) readExcludeWhitelist(excludeWhitelistFile) else emptyList()
|
||||
println("Exclude Contracts from whitelist: $excludeContracts")
|
||||
|
||||
val newWhiteList = cordapps?.flatMap { cordappJarPath ->
|
||||
val newWhiteList: Map<ContractClassName, AttachmentId> = cordapps?.flatMap { cordappJarPath ->
|
||||
val jarHash = getJarHash(cordappJarPath)
|
||||
scanJarForContracts(cordappJarPath).map { contract ->
|
||||
contract to jarHash
|
||||
}
|
||||
}?.filter { (contractClassName, _) -> contractClassName !in excludeContracts }?.toMap() ?: emptyMap()
|
||||
|
||||
println("Calculating whitelist for current cordapps: $newWhiteList")
|
||||
println("Calculating whitelist for current CorDapps:")
|
||||
newWhiteList.forEach { (contract, attachment) -> println("$contract:$attachment") }
|
||||
|
||||
val merged = (newWhiteList.keys + existingWhitelist.keys).map { contractClassName ->
|
||||
val existing = existingWhitelist[contractClassName] ?: emptyList()
|
||||
@ -209,15 +211,16 @@ class NetworkBootstrapper {
|
||||
contractClassName to (if (newHash == null || newHash in existing) existing else existing + newHash)
|
||||
}.toMap()
|
||||
|
||||
println("Final whitelist: $merged")
|
||||
println("Final whitelist:")
|
||||
merged.forEach { println(it.outputString()) }
|
||||
|
||||
return merged
|
||||
}
|
||||
|
||||
private fun overwriteWhitelist(whitelistFile: Path, mergedWhiteList: Map<String, List<AttachmentId>>) {
|
||||
PrintStream(whitelistFile.toFile().outputStream()).use { out ->
|
||||
mergedWhiteList.forEach { (contract, attachments) ->
|
||||
out.println("${contract}:${attachments.joinToString(",")}")
|
||||
mergedWhiteList.forEach {
|
||||
out.println(it.outputString())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -240,15 +243,17 @@ class NetworkBootstrapper {
|
||||
|
||||
private fun NodeInfo.notaryIdentity(): Party {
|
||||
return when (legalIdentities.size) {
|
||||
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
|
||||
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
|
||||
1 -> legalIdentities[0]
|
||||
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
|
||||
// cluster and is shared by all the other members. This is the notary identity.
|
||||
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
|
||||
// cluster and is shared by all the other members. This is the notary identity.
|
||||
2 -> legalIdentities[1]
|
||||
else -> throw IllegalArgumentException("Not sure how to get the notary identity in this scenerio: $this")
|
||||
}
|
||||
}
|
||||
|
||||
private fun Map.Entry<ContractClassName, List<AttachmentId>>.outputString() = "$key:${value.joinToString(",")}"
|
||||
|
||||
// We need to to set serialization env, because generation of parameters is run from Cordform.
|
||||
// KryoServerSerializationScheme is not accessible from nodeapi.
|
||||
private fun initialiseSerialization() {
|
||||
|
@ -14,6 +14,9 @@ import java.time.Instant
|
||||
const val NETWORK_PARAMS_FILE_NAME = "network-parameters"
|
||||
const val NETWORK_PARAMS_UPDATE_FILE_NAME = "network-parameters-update"
|
||||
|
||||
typealias SignedNetworkMap = SignedDataWithCert<NetworkMap>
|
||||
typealias SignedNetworkParameters = SignedDataWithCert<NetworkParameters>
|
||||
|
||||
/**
|
||||
* Data structure representing the network map available from the HTTP network map service as a serialised blob.
|
||||
* @property nodeInfoHashes list of network participant's [NodeInfo] hashes
|
||||
|
@ -2,18 +2,20 @@ package net.corda.node.services.network
|
||||
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.exists
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.internal.readObject
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.SignedNetworkParameters
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.driver.internal.RandomFree
|
||||
@ -65,8 +67,7 @@ class NetworkMapTest {
|
||||
) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val networkParameters = (alice.baseDirectory / NETWORK_PARAMS_FILE_NAME)
|
||||
.readAll()
|
||||
.deserialize<SignedDataWithCert<NetworkParameters>>()
|
||||
.readObject<SignedNetworkParameters>()
|
||||
.verified()
|
||||
// We use a random modified time above to make the network parameters unqiue so that we're sure they came
|
||||
// from the server
|
||||
|
@ -7,7 +7,7 @@ import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
@ -39,8 +39,7 @@ class NodeInfoWatcherTest {
|
||||
private val scheduler = TestScheduler()
|
||||
private val testSubscriber = TestSubscriber<NodeInfo>()
|
||||
|
||||
private lateinit var nodeInfo: NodeInfo
|
||||
private lateinit var signedNodeInfo: SignedNodeInfo
|
||||
private lateinit var nodeInfoAndSigned: NodeInfoAndSigned
|
||||
private lateinit var nodeInfoPath: Path
|
||||
private lateinit var keyManagementService: KeyManagementService
|
||||
|
||||
@ -49,9 +48,7 @@ class NodeInfoWatcherTest {
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
val nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME)
|
||||
nodeInfo = nodeInfoAndSigned.first
|
||||
signedNodeInfo = nodeInfoAndSigned.second
|
||||
nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME)
|
||||
val identityService = makeTestIdentityService()
|
||||
keyManagementService = MockKeyManagementService(identityService)
|
||||
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler)
|
||||
@ -62,7 +59,7 @@ class NodeInfoWatcherTest {
|
||||
fun `save a NodeInfo`() {
|
||||
assertEquals(0,
|
||||
tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
|
||||
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), signedNodeInfo)
|
||||
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), nodeInfoAndSigned)
|
||||
|
||||
val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
|
||||
assertEquals(1, nodeInfoFiles.size)
|
||||
@ -76,8 +73,8 @@ class NodeInfoWatcherTest {
|
||||
@Test
|
||||
fun `save a NodeInfo to JimFs`() {
|
||||
val jimFs = Jimfs.newFileSystem(Configuration.unix())
|
||||
val jimFolder = jimFs.getPath("/nodeInfo")
|
||||
NodeInfoWatcher.saveToFile(jimFolder, signedNodeInfo)
|
||||
val jimFolder = jimFs.getPath("/nodeInfo").createDirectories()
|
||||
NodeInfoWatcher.saveToFile(jimFolder, nodeInfoAndSigned)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -104,7 +101,7 @@ class NodeInfoWatcherTest {
|
||||
try {
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
assertEquals(1, readNodes.size)
|
||||
assertEquals(nodeInfo, readNodes.first())
|
||||
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first())
|
||||
} finally {
|
||||
subscription.unsubscribe()
|
||||
}
|
||||
@ -129,7 +126,7 @@ class NodeInfoWatcherTest {
|
||||
testSubscriber.awaitValueCount(1, 5, TimeUnit.SECONDS)
|
||||
// The same folder can be reported more than once, so take unique values.
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
assertEquals(nodeInfo, readNodes.first())
|
||||
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first())
|
||||
} finally {
|
||||
subscription.unsubscribe()
|
||||
}
|
||||
@ -141,6 +138,6 @@ class NodeInfoWatcherTest {
|
||||
|
||||
// Write a nodeInfo under the right path.
|
||||
private fun createNodeInfoFileInPath() {
|
||||
NodeInfoWatcher.saveToFile(nodeInfoPath, signedNodeInfo)
|
||||
NodeInfoWatcher.saveToFile(nodeInfoPath, nodeInfoAndSigned)
|
||||
}
|
||||
}
|
||||
|
@ -61,11 +61,11 @@ import net.corda.node.services.vault.VaultSoftLockManager
|
||||
import net.corda.node.shell.InteractiveShell
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||
import net.corda.nodeapi.internal.sign
|
||||
import net.corda.nodeapi.internal.storeLegalIdentity
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
|
||||
@ -180,11 +180,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
persistentNetworkMapCache.start()
|
||||
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
|
||||
nodeInfo
|
||||
}
|
||||
}
|
||||
@ -265,11 +265,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
configuration.baseDirectory)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
networkMapUpdater.updateNodeInfo(services.myInfo) {
|
||||
it.sign { publicKey, serialised ->
|
||||
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
|
||||
}
|
||||
log.info("Node-info for this node: ${services.myInfo}")
|
||||
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
|
||||
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
|
||||
}
|
||||
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
|
||||
networkMapUpdater.subscribeToNetworkMap()
|
||||
|
||||
// If we successfully loaded network data from database, we set this future to Unit.
|
||||
|
@ -53,7 +53,7 @@ internal class CordaRPCOpsImpl(
|
||||
}
|
||||
|
||||
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
|
||||
return services.networkMapUpdater.track()
|
||||
return services.networkMapUpdater.trackParametersUpdate()
|
||||
}
|
||||
|
||||
override fun acceptNewNetworkParameters(parametersHash: SecureHash) {
|
||||
|
@ -3,12 +3,12 @@ package net.corda.node.internal
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.network.NetworkMapClient
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.SignedNetworkParameters
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
@ -26,9 +26,9 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
|
||||
val networkParameters by lazy { retrieveNetworkParameters() }
|
||||
|
||||
private fun retrieveNetworkParameters(): NetworkParameters {
|
||||
val advertisedParametersHash = networkMapClient?.getNetworkMap()?.networkMap?.networkParameterHash
|
||||
val advertisedParametersHash = networkMapClient?.getNetworkMap()?.payload?.networkParameterHash
|
||||
val signedParametersFromFile = if (networkParamsFile.exists()) {
|
||||
networkParamsFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>()
|
||||
networkParamsFile.readObject<SignedNetworkParameters>()
|
||||
} else {
|
||||
null
|
||||
}
|
||||
@ -51,13 +51,13 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
|
||||
return parameters
|
||||
}
|
||||
|
||||
private fun readParametersUpdate(advertisedParametersHash: SecureHash, previousParametersHash: SecureHash): SignedDataWithCert<NetworkParameters> {
|
||||
private fun readParametersUpdate(advertisedParametersHash: SecureHash, previousParametersHash: SecureHash): SignedNetworkParameters {
|
||||
if (!parametersUpdateFile.exists()) {
|
||||
throw IllegalArgumentException("Node uses parameters with hash: $previousParametersHash " +
|
||||
"but network map is advertising: ${advertisedParametersHash}.\n" +
|
||||
"but network map is advertising: $advertisedParametersHash.\n" +
|
||||
"Please update node to use correct network parameters file.")
|
||||
}
|
||||
val signedUpdatedParameters = parametersUpdateFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>()
|
||||
val signedUpdatedParameters = parametersUpdateFile.readObject<SignedNetworkParameters>()
|
||||
if (signedUpdatedParameters.raw.hash != advertisedParametersHash) {
|
||||
throw IllegalArgumentException("Both network parameters and network parameters update files don't match" +
|
||||
"parameters advertised by network map.\n" +
|
||||
|
@ -1,43 +1,27 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.node.utilities.registration.cacheControl
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkMap
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import java.io.BufferedReader
|
||||
import java.io.Closeable
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.security.cert.X509Certificate
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certificate) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
private val networkMapUrl = URL("$compatibilityZoneURL/network-map")
|
||||
|
||||
fun publish(signedNodeInfo: SignedNodeInfo) {
|
||||
@ -57,10 +41,13 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
|
||||
fun getNetworkMap(): NetworkMapResponse {
|
||||
logger.trace { "Fetching network map update from $networkMapUrl." }
|
||||
val connection = networkMapUrl.openHttpConnection()
|
||||
val signedNetworkMap = connection.responseAs<SignedDataWithCert<NetworkMap>>()
|
||||
val signedNetworkMap = connection.responseAs<SignedNetworkMap>()
|
||||
val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustedRoot)
|
||||
val timeout = connection.cacheControl().maxAgeSeconds().seconds
|
||||
logger.trace { "Fetched network map update from $networkMapUrl successfully, retrieved ${networkMap.nodeInfoHashes.size} node info hashes. Node Info hashes: ${networkMap.nodeInfoHashes.joinToString("\n")}" }
|
||||
logger.trace {
|
||||
"Fetched network map update from $networkMapUrl successfully, retrieved ${networkMap.nodeInfoHashes.size} " +
|
||||
"node info hashes. Node Info hashes:\n${networkMap.nodeInfoHashes.joinToString("\n")}"
|
||||
}
|
||||
return NetworkMapResponse(networkMap, timeout)
|
||||
}
|
||||
|
||||
@ -72,10 +59,10 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
|
||||
return verifiedNodeInfo
|
||||
}
|
||||
|
||||
fun getNetworkParameters(networkParameterHash: SecureHash): SignedDataWithCert<NetworkParameters> {
|
||||
fun getNetworkParameters(networkParameterHash: SecureHash): SignedNetworkParameters {
|
||||
val url = URL("$networkMapUrl/network-parameters/$networkParameterHash")
|
||||
logger.trace { "Fetching network parameters: '$networkParameterHash' from $url." }
|
||||
val networkParameter = url.openHttpConnection().responseAs<SignedDataWithCert<NetworkParameters>>()
|
||||
val networkParameter = url.openHttpConnection().responseAs<SignedNetworkParameters>()
|
||||
logger.trace { "Fetched network parameters: '$networkParameterHash' successfully. Network Parameters: $networkParameter" }
|
||||
return networkParameter
|
||||
}
|
||||
@ -89,143 +76,4 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
|
||||
}
|
||||
}
|
||||
|
||||
data class NetworkMapResponse(val networkMap: NetworkMap, val cacheMaxAge: Duration)
|
||||
|
||||
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val fileWatcher: NodeInfoWatcher,
|
||||
private val networkMapClient: NetworkMapClient?,
|
||||
private val currentParametersHash: SecureHash,
|
||||
private val baseDirectory: Path) : Closeable {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
private val retryInterval = 1.minutes
|
||||
}
|
||||
|
||||
private var newNetworkParameters: Pair<ParametersUpdate, SignedDataWithCert<NetworkParameters>>? = null
|
||||
|
||||
fun track(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
|
||||
val currentUpdateInfo = newNetworkParameters?.let {
|
||||
ParametersUpdateInfo(it.first.newParametersHash, it.second.verified(), it.first.description, it.first.updateDeadline)
|
||||
}
|
||||
return DataFeed(
|
||||
currentUpdateInfo,
|
||||
parametersUpdatesTrack
|
||||
)
|
||||
}
|
||||
|
||||
private val parametersUpdatesTrack: PublishSubject<ParametersUpdateInfo> = PublishSubject.create<ParametersUpdateInfo>()
|
||||
private val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
|
||||
private var fileWatcherSubscription: Subscription? = null
|
||||
|
||||
override fun close() {
|
||||
fileWatcherSubscription?.unsubscribe()
|
||||
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
fun updateNodeInfo(newInfo: NodeInfo, signNodeInfo: (NodeInfo) -> SignedNodeInfo) {
|
||||
val oldInfo = networkMapCache.getNodeByLegalIdentity(newInfo.legalIdentities.first())
|
||||
// Compare node info without timestamp.
|
||||
if (newInfo.copy(serial = 0L) == oldInfo?.copy(serial = 0L)) return
|
||||
|
||||
// Only publish and write to disk if there are changes to the node info.
|
||||
val signedNodeInfo = signNodeInfo(newInfo)
|
||||
networkMapCache.addNode(newInfo)
|
||||
fileWatcher.saveToFile(signedNodeInfo)
|
||||
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(signedNodeInfo, networkMapClient)
|
||||
}
|
||||
}
|
||||
|
||||
fun subscribeToNetworkMap() {
|
||||
require(fileWatcherSubscription == null) { "Should not call this method twice." }
|
||||
// Subscribe to file based networkMap
|
||||
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode)
|
||||
|
||||
if (networkMapClient == null) return
|
||||
// Subscribe to remote network map if configured.
|
||||
val task = object : Runnable {
|
||||
override fun run() {
|
||||
val nextScheduleDelay = try {
|
||||
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
|
||||
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(it) }
|
||||
if (currentParametersHash != networkMap.networkParameterHash) {
|
||||
// TODO This needs special handling (node omitted update process/didn't accept new parameters or didn't restart on updateDeadline)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Please update node to use correct network parameters file.\"")
|
||||
System.exit(1)
|
||||
}
|
||||
val currentNodeHashes = networkMapCache.allNodeHashes
|
||||
val hashesFromNetworkMap = networkMap.nodeInfoHashes
|
||||
(hashesFromNetworkMap - currentNodeHashes).mapNotNull {
|
||||
// Download new node info from network map
|
||||
try {
|
||||
networkMapClient.getNodeInfo(it)
|
||||
} catch (e: Exception) {
|
||||
// Failure to retrieve one node info shouldn't stop the whole update, log and return null instead.
|
||||
logger.warn("Error encountered when downloading node info '$it', skipping...", e)
|
||||
null
|
||||
}
|
||||
}.forEach {
|
||||
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
|
||||
networkMapCache.addNode(it)
|
||||
}
|
||||
// Remove node info from network map.
|
||||
(currentNodeHashes - hashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
|
||||
.mapNotNull(networkMapCache::getNodeByHash)
|
||||
.forEach(networkMapCache::removeNode)
|
||||
cacheTimeout
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Error encountered while updating network map, will retry in ${retryInterval.seconds} seconds", t)
|
||||
retryInterval
|
||||
}
|
||||
// Schedule the next update.
|
||||
executor.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
executor.submit(task) // The check may be expensive, so always run it in the background even the first time.
|
||||
}
|
||||
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
val task = object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Error encountered while publishing node info, will retry in ${retryInterval.seconds} seconds.", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, retryInterval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
}
|
||||
executor.submit(task)
|
||||
}
|
||||
|
||||
private fun handleUpdateNetworkParameters(update: ParametersUpdate) {
|
||||
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) { // This update was handled already.
|
||||
return
|
||||
}
|
||||
val newParameters = networkMapClient?.getNetworkParameters(update.newParametersHash)
|
||||
if (newParameters != null) {
|
||||
logger.info("Downloaded new network parameters: $newParameters from the update: $update")
|
||||
newNetworkParameters = Pair(update, newParameters)
|
||||
parametersUpdatesTrack.onNext(ParametersUpdateInfo(update.newParametersHash, newParameters.verifiedNetworkMapCert(networkMapClient!!.trustedRoot), update.description, update.updateDeadline))
|
||||
}
|
||||
}
|
||||
|
||||
fun acceptNewNetworkParameters(parametersHash: SecureHash, sign: (SecureHash) -> SignedData<SecureHash>) {
|
||||
networkMapClient ?: throw IllegalStateException("Network parameters updates are not support without compatibility zone configured")
|
||||
// TODO This scenario will happen if node was restarted and didn't download parameters yet, but we accepted them. Add persisting of newest parameters from update.
|
||||
val (_, newParams) = newNetworkParameters ?: throw IllegalArgumentException("Couldn't find parameters update for the hash: $parametersHash")
|
||||
val newParametersHash = newParams.verifiedNetworkMapCert(networkMapClient.trustedRoot).serialize().hash // We should check that we sign the right data structure hash.
|
||||
if (parametersHash == newParametersHash) {
|
||||
// The latest parameters have priority.
|
||||
newParams.serialize()
|
||||
.open()
|
||||
.copyTo(baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME, StandardCopyOption.REPLACE_EXISTING)
|
||||
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
|
||||
} else {
|
||||
throw IllegalArgumentException("Refused to accept parameters with hash $parametersHash because network map advertises update with hash $newParametersHash. Please check newest version")
|
||||
}
|
||||
}
|
||||
}
|
||||
data class NetworkMapResponse(val payload: NetworkMap, val cacheMaxAge: Duration)
|
||||
|
@ -0,0 +1,179 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.internal.copyTo
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.nodeapi.internal.network.SignedNetworkParameters
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val fileWatcher: NodeInfoWatcher,
|
||||
private val networkMapClient: NetworkMapClient?,
|
||||
private val currentParametersHash: SecureHash,
|
||||
private val baseDirectory: Path
|
||||
) : AutoCloseable {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
private val defaultRetryInterval = 1.minutes
|
||||
}
|
||||
|
||||
private val parametersUpdatesTrack: PublishSubject<ParametersUpdateInfo> = PublishSubject.create<ParametersUpdateInfo>()
|
||||
private val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
|
||||
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
|
||||
private var fileWatcherSubscription: Subscription? = null
|
||||
|
||||
override fun close() {
|
||||
fileWatcherSubscription?.unsubscribe()
|
||||
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
fun trackParametersUpdate(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
|
||||
val currentUpdateInfo = newNetworkParameters?.let {
|
||||
ParametersUpdateInfo(it.first.newParametersHash, it.second.verified(), it.first.description, it.first.updateDeadline)
|
||||
}
|
||||
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
|
||||
}
|
||||
|
||||
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
|
||||
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
|
||||
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
|
||||
// Compare node info without timestamp.
|
||||
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
|
||||
|
||||
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
|
||||
// Only publish and write to disk if there are changes to the node info.
|
||||
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
|
||||
fileWatcher.saveToFile(nodeInfoAndSigned)
|
||||
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
|
||||
}
|
||||
}
|
||||
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fun subscribeToNetworkMap() {
|
||||
require(fileWatcherSubscription == null) { "Should not call this method twice." }
|
||||
// Subscribe to file based networkMap
|
||||
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode)
|
||||
|
||||
if (networkMapClient == null) return
|
||||
|
||||
// Subscribe to remote network map if configured.
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
val nextScheduleDelay = try {
|
||||
updateNetworkMapCache(networkMapClient)
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Error encountered while updating network map, will retry in $defaultRetryInterval", t)
|
||||
defaultRetryInterval
|
||||
}
|
||||
// Schedule the next update.
|
||||
executor.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}) // The check may be expensive, so always run it in the background even the first time.
|
||||
}
|
||||
|
||||
private fun updateNetworkMapCache(networkMapClient: NetworkMapClient): Duration {
|
||||
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
|
||||
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
|
||||
|
||||
if (currentParametersHash != networkMap.networkParameterHash) {
|
||||
// TODO This needs special handling (node omitted update process/didn't accept new parameters or didn't restart on updateDeadline)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
|
||||
"advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Please update node to use correct network parameters file.\"")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
val currentNodeHashes = networkMapCache.allNodeHashes
|
||||
val hashesFromNetworkMap = networkMap.nodeInfoHashes
|
||||
(hashesFromNetworkMap - currentNodeHashes).mapNotNull {
|
||||
// Download new node info from network map
|
||||
try {
|
||||
networkMapClient.getNodeInfo(it)
|
||||
} catch (e: Exception) {
|
||||
// Failure to retrieve one node info shouldn't stop the whole update, log and return null instead.
|
||||
logger.warn("Error encountered when downloading node info '$it', skipping...", e)
|
||||
null
|
||||
}
|
||||
}.forEach {
|
||||
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
|
||||
networkMapCache.addNode(it)
|
||||
}
|
||||
|
||||
// Remove node info from network map.
|
||||
(currentNodeHashes - hashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
|
||||
.mapNotNull(networkMapCache::getNodeByHash)
|
||||
.forEach(networkMapCache::removeNode)
|
||||
|
||||
return cacheTimeout
|
||||
}
|
||||
|
||||
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
|
||||
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
|
||||
// This update was handled already.
|
||||
return
|
||||
}
|
||||
val newParameters = networkMapClient.getNetworkParameters(update.newParametersHash)
|
||||
logger.info("Downloaded new network parameters: $newParameters from the update: $update")
|
||||
newNetworkParameters = Pair(update, newParameters)
|
||||
val updateInfo = ParametersUpdateInfo(
|
||||
update.newParametersHash,
|
||||
newParameters.verifiedNetworkMapCert(networkMapClient.trustedRoot),
|
||||
update.description,
|
||||
update.updateDeadline)
|
||||
parametersUpdatesTrack.onNext(updateInfo)
|
||||
}
|
||||
|
||||
fun acceptNewNetworkParameters(parametersHash: SecureHash, sign: (SecureHash) -> SignedData<SecureHash>) {
|
||||
networkMapClient ?: throw IllegalStateException("Network parameters updates are not support without compatibility zone configured")
|
||||
// TODO This scenario will happen if node was restarted and didn't download parameters yet, but we accepted them.
|
||||
// Add persisting of newest parameters from update.
|
||||
val (_, newParams) = requireNotNull(newNetworkParameters) { "Couldn't find parameters update for the hash: $parametersHash" }
|
||||
// We should check that we sign the right data structure hash.
|
||||
val newParametersHash = newParams.verifiedNetworkMapCert(networkMapClient.trustedRoot).serialize().hash
|
||||
if (parametersHash == newParametersHash) {
|
||||
// The latest parameters have priority.
|
||||
newParams.serialize()
|
||||
.open()
|
||||
.copyTo(baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME, StandardCopyOption.REPLACE_EXISTING)
|
||||
networkMapClient.ackNetworkParametersUpdate(sign(parametersHash))
|
||||
} else {
|
||||
throw IllegalArgumentException("Refused to accept parameters with hash $parametersHash because network map " +
|
||||
"advertises update with hash $newParametersHash. Please check newest version")
|
||||
}
|
||||
}
|
||||
}
|
@ -4,18 +4,26 @@ import net.corda.cordform.CordformNode
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal._contextSerializationEnv
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import rx.Observable
|
||||
import rx.Scheduler
|
||||
import java.io.IOException
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.stream.Stream
|
||||
import kotlin.streams.toList
|
||||
|
||||
/**
|
||||
@ -32,34 +40,29 @@ import kotlin.streams.toList
|
||||
class NodeInfoWatcher(private val nodePath: Path,
|
||||
private val scheduler: Scheduler,
|
||||
private val pollInterval: Duration = 5.seconds) {
|
||||
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
|
||||
private val processedNodeInfoFiles = mutableSetOf<Path>()
|
||||
private val _processedNodeInfoHashes = mutableSetOf<SecureHash>()
|
||||
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes.toSet()
|
||||
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
/**
|
||||
* Saves the given [NodeInfo] to a path.
|
||||
* The node is 'encoded' as a SignedNodeInfo, signed with the owning key of its first identity.
|
||||
* The name of the written file will be "nodeInfo-" followed by the hash of the content. The hash in the filename
|
||||
* is used so that one can freely copy these files without fearing to overwrite another one.
|
||||
*
|
||||
* @param path the path where to write the file, if non-existent it will be created.
|
||||
* @param signedNodeInfo the signed NodeInfo.
|
||||
*/
|
||||
fun saveToFile(path: Path, signedNodeInfo: SignedNodeInfo) {
|
||||
try {
|
||||
path.createDirectories()
|
||||
signedNodeInfo.serialize()
|
||||
.open()
|
||||
.copyTo(path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${signedNodeInfo.raw.hash}")
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Couldn't write node info to file", e)
|
||||
}
|
||||
|
||||
// TODO This method doesn't belong in this class
|
||||
fun saveToFile(path: Path, nodeInfoAndSigned: NodeInfoAndSigned) {
|
||||
// By using the hash of the node's first name we ensure:
|
||||
// 1) node info files for the same node map to the same filename and thus avoid having duplicate files for
|
||||
// the same node
|
||||
// 2) avoid having to deal with characters in the X.500 name which are incompatible with the local filesystem
|
||||
val fileNameHash = nodeInfoAndSigned.nodeInfo.legalIdentities[0].name.serialize().hash
|
||||
nodeInfoAndSigned
|
||||
.signed
|
||||
.serialize()
|
||||
.open()
|
||||
.copyTo(path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}$fileNameHash", REPLACE_EXISTING)
|
||||
}
|
||||
}
|
||||
|
||||
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
|
||||
|
||||
private val _processedNodeInfoHashes = HashSet<SecureHash>()
|
||||
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes
|
||||
|
||||
init {
|
||||
require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." }
|
||||
if (!nodeInfoDirectory.isDirectory()) {
|
||||
@ -85,7 +88,10 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
.flatMapIterable { loadFromDirectory() }
|
||||
}
|
||||
|
||||
fun saveToFile(signedNodeInfo: SignedNodeInfo) = Companion.saveToFile(nodePath, signedNodeInfo)
|
||||
// TODO This method doesn't belong in this class
|
||||
fun saveToFile(nodeInfoAndSigned: NodeInfoAndSigned) {
|
||||
return Companion.saveToFile(nodePath, nodeInfoAndSigned)
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads all the files contained in a given path and returns the deserialized [NodeInfo]s.
|
||||
@ -98,16 +104,15 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
return emptyList()
|
||||
}
|
||||
val result = nodeInfoDirectory.list { paths ->
|
||||
paths.filter { it !in processedNodeInfoFiles }
|
||||
paths
|
||||
.filter { it.isRegularFile() }
|
||||
.map { path ->
|
||||
processFile(path)?.apply {
|
||||
processedNodeInfoFiles.add(path)
|
||||
_processedNodeInfoHashes.add(this.serialize().hash)
|
||||
.flatMap { path ->
|
||||
val nodeInfo = processFile(path)?.let {
|
||||
if (_processedNodeInfoHashes.add(it.signed.raw.hash)) it.nodeInfo else null
|
||||
}
|
||||
if (nodeInfo != null) Stream.of(nodeInfo) else Stream.empty()
|
||||
}
|
||||
.toList()
|
||||
.filterNotNull()
|
||||
}
|
||||
if (result.isNotEmpty()) {
|
||||
logger.info("Successfully read ${result.size} NodeInfo files from disk.")
|
||||
@ -115,14 +120,25 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
return result
|
||||
}
|
||||
|
||||
private fun processFile(file: Path): NodeInfo? {
|
||||
private fun processFile(file: Path): NodeInfoAndSigned? {
|
||||
return try {
|
||||
logger.info("Reading NodeInfo from file: $file")
|
||||
val signedData = file.readAll().deserialize<SignedNodeInfo>()
|
||||
signedData.verified()
|
||||
val signedNodeInfo = file.readObject<SignedNodeInfo>()
|
||||
NodeInfoAndSigned(signedNodeInfo)
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Exception parsing NodeInfo from file. $file", e)
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Remove this once we have a tool that can read AMQP serialised files
|
||||
fun main(args: Array<String>) {
|
||||
_contextSerializationEnv.set(SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(AMQPServerSerializationScheme())
|
||||
},
|
||||
AMQP_P2P_CONTEXT)
|
||||
)
|
||||
println(Paths.get(args[0]).readObject<SignedNodeInfo>().verified())
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ class NetworkMapClientTest {
|
||||
|
||||
val nodeInfoHash = nodeInfo.serialize().sha256()
|
||||
|
||||
assertThat(networkMapClient.getNetworkMap().networkMap.nodeInfoHashes).containsExactly(nodeInfoHash)
|
||||
assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash)
|
||||
assertEquals(nodeInfo, networkMapClient.getNodeInfo(nodeInfoHash))
|
||||
|
||||
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned(BOB_NAME)
|
||||
@ -65,7 +65,7 @@ class NetworkMapClientTest {
|
||||
networkMapClient.publish(signedNodeInfo2)
|
||||
|
||||
val nodeInfoHash2 = nodeInfo2.serialize().sha256()
|
||||
assertThat(networkMapClient.getNetworkMap().networkMap.nodeInfoHashes).containsExactly(nodeInfoHash, nodeInfoHash2)
|
||||
assertThat(networkMapClient.getNetworkMap().payload.nodeInfoHashes).containsExactly(nodeInfoHash, nodeInfoHash2)
|
||||
assertEquals(cacheTimeout, networkMapClient.getNetworkMap().cacheMaxAge)
|
||||
assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2))
|
||||
}
|
||||
|
@ -15,17 +15,14 @@ import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.ParametersUpdateInfo
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkMap
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.DEV_ROOT_CA
|
||||
@ -72,33 +69,33 @@ class NetworkMapUpdaterTest {
|
||||
fun `publish node info`() {
|
||||
nodeInfoBuilder.addIdentity(ALICE_NAME)
|
||||
|
||||
val (nodeInfo1, signedNodeInfo1) = nodeInfoBuilder.buildWithSigned()
|
||||
val (sameNodeInfoDifferentTime, signedSameNodeInfoDifferentTime) = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
|
||||
val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned()
|
||||
val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
|
||||
|
||||
// Publish node info for the first time.
|
||||
updater.updateNodeInfo(nodeInfo1) { signedNodeInfo1 }
|
||||
updater.updateNodeInfo(nodeInfo1AndSigned)
|
||||
// Sleep as publish is asynchronous.
|
||||
// TODO: Remove sleep in unit test
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
verify(networkMapClient, times(1)).publish(any())
|
||||
|
||||
networkMapCache.addNode(nodeInfo1)
|
||||
networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo)
|
||||
|
||||
// Publish the same node info, but with different serial.
|
||||
updater.updateNodeInfo(sameNodeInfoDifferentTime) { signedSameNodeInfoDifferentTime }
|
||||
updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// Same node info should not publish twice
|
||||
verify(networkMapClient, times(0)).publish(signedSameNodeInfoDifferentTime)
|
||||
verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed)
|
||||
|
||||
val (differentNodeInfo, signedDifferentNodeInfo) = createNodeInfoAndSigned("Bob")
|
||||
val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob")
|
||||
|
||||
// Publish different node info.
|
||||
updater.updateNodeInfo(differentNodeInfo) { signedDifferentNodeInfo }
|
||||
updater.updateNodeInfo(differentNodeInfoAndSigned)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
verify(networkMapClient, times(1)).publish(signedDifferentNodeInfo)
|
||||
verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -107,7 +104,7 @@ class NetworkMapUpdaterTest {
|
||||
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("Info 2")
|
||||
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3")
|
||||
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
|
||||
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
|
||||
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Test adding new node.
|
||||
networkMapClient.publish(signedNodeInfo1)
|
||||
@ -123,7 +120,7 @@ class NetworkMapUpdaterTest {
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo1)
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo2)
|
||||
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
|
||||
networkMapClient.publish(signedNodeInfo3)
|
||||
networkMapClient.publish(signedNodeInfo4)
|
||||
|
||||
@ -135,7 +132,7 @@ class NetworkMapUpdaterTest {
|
||||
verify(networkMapCache, times(5)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo3)
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo4)
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -144,10 +141,10 @@ class NetworkMapUpdaterTest {
|
||||
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("Info 2")
|
||||
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3")
|
||||
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
|
||||
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
|
||||
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Add all nodes.
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
|
||||
networkMapClient.publish(signedNodeInfo1)
|
||||
networkMapClient.publish(signedNodeInfo2)
|
||||
networkMapClient.publish(signedNodeInfo3)
|
||||
@ -161,7 +158,7 @@ class NetworkMapUpdaterTest {
|
||||
// 4 node info from network map, and 1 from file.
|
||||
assertThat(nodeInfoMap).hasSize(4)
|
||||
verify(networkMapCache, times(5)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
|
||||
|
||||
// Test remove node.
|
||||
nodeInfoMap.clear()
|
||||
@ -174,30 +171,30 @@ class NetworkMapUpdaterTest {
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo4)
|
||||
|
||||
// Node info from file should not be deleted
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfo.serialize().hash)
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `receive node infos from directory, without a network map`() {
|
||||
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
|
||||
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Not subscribed yet.
|
||||
verify(networkMapCache, times(0)).addNode(any())
|
||||
|
||||
updater.subscribeToNetworkMap()
|
||||
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
|
||||
verify(networkMapCache, times(1)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
|
||||
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfo.serialize().hash)
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `emit new parameters update info on parameters update from network map`() {
|
||||
val paramsFeed = updater.track()
|
||||
val paramsFeed = updater.trackParametersUpdate()
|
||||
val snapshot = paramsFeed.snapshot
|
||||
val updates = paramsFeed.updates.bufferUntilSubscribed()
|
||||
assertEquals(null, snapshot)
|
||||
@ -229,7 +226,7 @@ class NetworkMapUpdaterTest {
|
||||
updater.acceptNewNetworkParameters(newHash, { hash -> hash.serialize().sign(keyPair)})
|
||||
verify(networkMapClient).ackNetworkParametersUpdate(any())
|
||||
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val signedNetworkParams = updateFile.readAll().deserialize<SignedDataWithCert<NetworkParameters>>()
|
||||
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
|
||||
val paramsFromFile = signedNetworkParams.verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
|
||||
assertEquals(newParameters, paramsFromFile)
|
||||
}
|
||||
@ -279,7 +276,7 @@ class NetworkMapUpdaterTest {
|
||||
}
|
||||
}
|
||||
|
||||
private fun createNodeInfoAndSigned(org: String): Pair<NodeInfo, SignedNodeInfo> {
|
||||
private fun createNodeInfoAndSigned(org: String): NodeInfoAndSigned {
|
||||
return createNodeInfoAndSigned(CordaX500Name(org, "London", "GB"))
|
||||
}
|
||||
}
|
@ -2,15 +2,13 @@ package net.corda.node.services.network
|
||||
|
||||
import com.google.common.jimfs.Configuration
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.exists
|
||||
import net.corda.core.internal.readObject
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.NetworkParametersReader
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
@ -57,7 +55,9 @@ class NetworkParametersReaderTest {
|
||||
assertFalse((baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME).exists())
|
||||
assertEquals(server.networkParameters, parameters)
|
||||
// Parameters from update should be moved to `network-parameters` file.
|
||||
val parametersFromFile = (baseDirectory / NETWORK_PARAMS_FILE_NAME).readAll().deserialize<SignedDataWithCert<NetworkParameters>>().verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
|
||||
val parametersFromFile = (baseDirectory / NETWORK_PARAMS_FILE_NAME)
|
||||
.readObject<SignedNetworkParameters>()
|
||||
.verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
|
||||
assertEquals(server.networkParameters, parametersFromFile)
|
||||
}
|
||||
}
|
@ -18,7 +18,6 @@ import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
@ -484,7 +483,7 @@ class DriverDSLImpl(
|
||||
val nodeInfoFile = config.corda.baseDirectory.list { paths ->
|
||||
paths.filter { it.fileName.toString().startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
|
||||
}
|
||||
val nodeInfo = nodeInfoFile.readAll().deserialize<SignedNodeInfo>().verified()
|
||||
val nodeInfo = nodeInfoFile.readObject<SignedNodeInfo>().verified()
|
||||
NotaryInfo(nodeInfo.legalIdentities[0], spec.validating)
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.createDevNodeCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
@ -47,10 +48,11 @@ class TestNodeInfoBuilder(private val intermediateAndRoot: Pair<CertificateAndKe
|
||||
)
|
||||
}
|
||||
|
||||
fun buildWithSigned(serial: Long = 1, platformVersion: Int = 1): Pair<NodeInfo, SignedNodeInfo> {
|
||||
fun buildWithSigned(serial: Long = 1, platformVersion: Int = 1): NodeInfoAndSigned {
|
||||
val nodeInfo = build(serial, platformVersion)
|
||||
val privateKeys = identitiesAndPrivateKeys.map { it.second }
|
||||
return Pair(nodeInfo, nodeInfo.signWith(privateKeys))
|
||||
return NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
|
||||
identitiesAndPrivateKeys.first { it.first.owningKey == publicKey }.second.sign(serialised.bytes)
|
||||
}
|
||||
}
|
||||
|
||||
fun reset() {
|
||||
@ -58,7 +60,7 @@ class TestNodeInfoBuilder(private val intermediateAndRoot: Pair<CertificateAndKe
|
||||
}
|
||||
}
|
||||
|
||||
fun createNodeInfoAndSigned(vararg names: CordaX500Name, serial: Long = 1, platformVersion: Int = 1): Pair<NodeInfo, SignedNodeInfo> {
|
||||
fun createNodeInfoAndSigned(vararg names: CordaX500Name, serial: Long = 1, platformVersion: Int = 1): NodeInfoAndSigned {
|
||||
val nodeInfoBuilder = TestNodeInfoBuilder()
|
||||
names.forEach { nodeInfoBuilder.addIdentity(it) }
|
||||
return nodeInfoBuilder.buildWithSigned(serial, platformVersion)
|
||||
|
Loading…
Reference in New Issue
Block a user