CORDA-1048: Making it simpler to move an existing local deployment of nodes to across different machines. (#2672)

This was achieved by having the hash in the node-info file to be just of the node's X.500 name. This also solves existing duplicate node-info file issues that we've been having.

Also updated the docsite.
This commit is contained in:
Shams Asari
2018-03-01 21:24:10 +00:00
committed by GitHub
parent 14cafee66e
commit 8616f24523
13 changed files with 162 additions and 122 deletions

View File

@ -7,7 +7,7 @@ import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
@ -39,8 +39,7 @@ class NodeInfoWatcherTest {
private val scheduler = TestScheduler()
private val testSubscriber = TestSubscriber<NodeInfo>()
private lateinit var nodeInfo: NodeInfo
private lateinit var signedNodeInfo: SignedNodeInfo
private lateinit var nodeInfoAndSigned: NodeInfoAndSigned
private lateinit var nodeInfoPath: Path
private lateinit var keyManagementService: KeyManagementService
@ -49,9 +48,7 @@ class NodeInfoWatcherTest {
@Before
fun start() {
val nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME)
nodeInfo = nodeInfoAndSigned.first
signedNodeInfo = nodeInfoAndSigned.second
nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME)
val identityService = makeTestIdentityService()
keyManagementService = MockKeyManagementService(identityService)
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler)
@ -62,7 +59,7 @@ class NodeInfoWatcherTest {
fun `save a NodeInfo`() {
assertEquals(0,
tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), signedNodeInfo)
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), nodeInfoAndSigned)
val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
assertEquals(1, nodeInfoFiles.size)
@ -76,8 +73,8 @@ class NodeInfoWatcherTest {
@Test
fun `save a NodeInfo to JimFs`() {
val jimFs = Jimfs.newFileSystem(Configuration.unix())
val jimFolder = jimFs.getPath("/nodeInfo")
NodeInfoWatcher.saveToFile(jimFolder, signedNodeInfo)
val jimFolder = jimFs.getPath("/nodeInfo").createDirectories()
NodeInfoWatcher.saveToFile(jimFolder, nodeInfoAndSigned)
}
@Test
@ -104,7 +101,7 @@ class NodeInfoWatcherTest {
try {
val readNodes = testSubscriber.onNextEvents.distinct()
assertEquals(1, readNodes.size)
assertEquals(nodeInfo, readNodes.first())
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first())
} finally {
subscription.unsubscribe()
}
@ -129,7 +126,7 @@ class NodeInfoWatcherTest {
testSubscriber.awaitValueCount(1, 5, TimeUnit.SECONDS)
// The same folder can be reported more than once, so take unique values.
val readNodes = testSubscriber.onNextEvents.distinct()
assertEquals(nodeInfo, readNodes.first())
assertEquals(nodeInfoAndSigned.nodeInfo, readNodes.first())
} finally {
subscription.unsubscribe()
}
@ -141,6 +138,6 @@ class NodeInfoWatcherTest {
// Write a nodeInfo under the right path.
private fun createNodeInfoFileInPath() {
NodeInfoWatcher.saveToFile(nodeInfoPath, signedNodeInfo)
NodeInfoWatcher.saveToFile(nodeInfoPath, nodeInfoAndSigned)
}
}

View File

@ -61,11 +61,11 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JVMAgentRegistry
import net.corda.node.utilities.NodeBuildProperties
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.sign
import net.corda.nodeapi.internal.storeLegalIdentity
import org.apache.activemq.artemis.utils.ReusableLatch
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
@ -181,11 +181,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
nodeInfo
}
}
@ -268,11 +268,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
configuration.baseDirectory)
runOnStop += networkMapUpdater::close
networkMapUpdater.updateNodeInfo(services.myInfo) {
it.sign { publicKey, serialised ->
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
}
log.info("Node-info for this node: ${services.myInfo}")
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
}
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
networkMapUpdater.subscribeToNetworkMap()
// If we successfully loaded network data from database, we set this future to Unit.

View File

@ -7,12 +7,12 @@ import net.corda.core.internal.copyTo
import net.corda.core.internal.div
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.minutes
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.ParametersUpdate
@ -54,18 +54,19 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
}
fun updateNodeInfo(newInfo: NodeInfo, signer: (NodeInfo) -> SignedNodeInfo) {
val oldInfo = networkMapCache.getNodeByLegalIdentity(newInfo.legalIdentities.first())
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
// Compare node info without timestamp.
if (newInfo.copy(serial = 0L) == oldInfo?.copy(serial = 0L)) return
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
// Only publish and write to disk if there are changes to the node info.
val signedNodeInfo = signer(newInfo)
networkMapCache.addNode(newInfo)
fileWatcher.saveToFile(signedNodeInfo)
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
fileWatcher.saveToFile(nodeInfoAndSigned)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(signedNodeInfo, networkMapClient)
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
}

View File

@ -4,17 +4,26 @@ import net.corda.cordform.CordformNode
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.*
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal._contextSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import rx.Observable
import rx.Scheduler
import java.io.IOException
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.stream.Stream
import kotlin.streams.toList
/**
@ -31,34 +40,29 @@ import kotlin.streams.toList
class NodeInfoWatcher(private val nodePath: Path,
private val scheduler: Scheduler,
private val pollInterval: Duration = 5.seconds) {
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val processedNodeInfoFiles = mutableSetOf<Path>()
private val _processedNodeInfoHashes = mutableSetOf<SecureHash>()
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes.toSet()
companion object {
private val logger = contextLogger()
/**
* Saves the given [NodeInfo] to a path.
* The node is 'encoded' as a SignedNodeInfo, signed with the owning key of its first identity.
* The name of the written file will be "nodeInfo-" followed by the hash of the content. The hash in the filename
* is used so that one can freely copy these files without fearing to overwrite another one.
*
* @param path the path where to write the file, if non-existent it will be created.
* @param signedNodeInfo the signed NodeInfo.
*/
fun saveToFile(path: Path, signedNodeInfo: SignedNodeInfo) {
try {
path.createDirectories()
signedNodeInfo.serialize()
.open()
.copyTo(path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${signedNodeInfo.raw.hash}")
} catch (e: Exception) {
logger.warn("Couldn't write node info to file", e)
}
// TODO This method doesn't belong in this class
fun saveToFile(path: Path, nodeInfoAndSigned: NodeInfoAndSigned) {
// By using the hash of the node's first name we ensure:
// 1) node info files for the same node map to the same filename and thus avoid having duplicate files for
// the same node
// 2) avoid having to deal with characters in the X.500 name which are incompatible with the local filesystem
val fileNameHash = nodeInfoAndSigned.nodeInfo.legalIdentities[0].name.serialize().hash
nodeInfoAndSigned
.signed
.serialize()
.open()
.copyTo(path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}$fileNameHash", REPLACE_EXISTING)
}
}
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val _processedNodeInfoHashes = HashSet<SecureHash>()
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes
init {
require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." }
if (!nodeInfoDirectory.isDirectory()) {
@ -84,7 +88,10 @@ class NodeInfoWatcher(private val nodePath: Path,
.flatMapIterable { loadFromDirectory() }
}
fun saveToFile(signedNodeInfo: SignedNodeInfo) = Companion.saveToFile(nodePath, signedNodeInfo)
// TODO This method doesn't belong in this class
fun saveToFile(nodeInfoAndSigned: NodeInfoAndSigned) {
return Companion.saveToFile(nodePath, nodeInfoAndSigned)
}
/**
* Loads all the files contained in a given path and returns the deserialized [NodeInfo]s.
@ -97,16 +104,15 @@ class NodeInfoWatcher(private val nodePath: Path,
return emptyList()
}
val result = nodeInfoDirectory.list { paths ->
paths.filter { it !in processedNodeInfoFiles }
paths
.filter { it.isRegularFile() }
.map { path ->
processFile(path)?.apply {
processedNodeInfoFiles.add(path)
_processedNodeInfoHashes.add(this.serialize().hash)
.flatMap { path ->
val nodeInfo = processFile(path)?.let {
if (_processedNodeInfoHashes.add(it.signed.raw.hash)) it.nodeInfo else null
}
if (nodeInfo != null) Stream.of(nodeInfo) else Stream.empty()
}
.toList()
.filterNotNull()
}
if (result.isNotEmpty()) {
logger.info("Successfully read ${result.size} NodeInfo files from disk.")
@ -114,14 +120,25 @@ class NodeInfoWatcher(private val nodePath: Path,
return result
}
private fun processFile(file: Path): NodeInfo? {
private fun processFile(file: Path): NodeInfoAndSigned? {
return try {
logger.info("Reading NodeInfo from file: $file")
val signedData = file.readObject<SignedNodeInfo>()
signedData.verified()
val signedNodeInfo = file.readObject<SignedNodeInfo>()
NodeInfoAndSigned(signedNodeInfo)
} catch (e: Exception) {
logger.warn("Exception parsing NodeInfo from file. $file", e)
null
}
}
}
// TODO Remove this once we have a tool that can read AMQP serialised files
fun main(args: Array<String>) {
_contextSerializationEnv.set(SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(AMQPServerSerializationScheme())
},
AMQP_P2P_CONTEXT)
)
println(Paths.get(args[0]).readObject<SignedNodeInfo>().verified())
}

View File

@ -18,6 +18,7 @@ import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.millis
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
@ -68,33 +69,33 @@ class NetworkMapUpdaterTest {
fun `publish node info`() {
nodeInfoBuilder.addIdentity(ALICE_NAME)
val (nodeInfo1, signedNodeInfo1) = nodeInfoBuilder.buildWithSigned()
val (sameNodeInfoDifferentTime, signedSameNodeInfoDifferentTime) = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned()
val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
// Publish node info for the first time.
updater.updateNodeInfo(nodeInfo1) { signedNodeInfo1 }
updater.updateNodeInfo(nodeInfo1AndSigned)
// Sleep as publish is asynchronous.
// TODO: Remove sleep in unit test
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapClient, times(1)).publish(any())
networkMapCache.addNode(nodeInfo1)
networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo)
// Publish the same node info, but with different serial.
updater.updateNodeInfo(sameNodeInfoDifferentTime) { signedSameNodeInfoDifferentTime }
updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// Same node info should not publish twice
verify(networkMapClient, times(0)).publish(signedSameNodeInfoDifferentTime)
verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed)
val (differentNodeInfo, signedDifferentNodeInfo) = createNodeInfoAndSigned("Bob")
val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob")
// Publish different node info.
updater.updateNodeInfo(differentNodeInfo) { signedDifferentNodeInfo }
updater.updateNodeInfo(differentNodeInfoAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(200)
verify(networkMapClient, times(1)).publish(signedDifferentNodeInfo)
verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed)
}
@Test
@ -103,7 +104,7 @@ class NetworkMapUpdaterTest {
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("Info 2")
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3")
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Test adding new node.
networkMapClient.publish(signedNodeInfo1)
@ -119,7 +120,7 @@ class NetworkMapUpdaterTest {
verify(networkMapCache, times(1)).addNode(nodeInfo1)
verify(networkMapCache, times(1)).addNode(nodeInfo2)
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo4)
@ -131,7 +132,7 @@ class NetworkMapUpdaterTest {
verify(networkMapCache, times(5)).addNode(any())
verify(networkMapCache, times(1)).addNode(nodeInfo3)
verify(networkMapCache, times(1)).addNode(nodeInfo4)
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
}
@Test
@ -140,10 +141,10 @@ class NetworkMapUpdaterTest {
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("Info 2")
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3")
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Add all nodes.
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
networkMapClient.publish(signedNodeInfo1)
networkMapClient.publish(signedNodeInfo2)
networkMapClient.publish(signedNodeInfo3)
@ -157,7 +158,7 @@ class NetworkMapUpdaterTest {
// 4 node info from network map, and 1 from file.
assertThat(nodeInfoMap).hasSize(4)
verify(networkMapCache, times(5)).addNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
// Test remove node.
nodeInfoMap.clear()
@ -170,25 +171,25 @@ class NetworkMapUpdaterTest {
verify(networkMapCache, times(1)).removeNode(nodeInfo4)
// Node info from file should not be deleted
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfo.serialize().hash)
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
}
@Test
fun `receive node infos from directory, without a network map`() {
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
val fileNodeInfoAndSigned = createNodeInfoAndSigned("Info from file")
// Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
updater.subscribeToNetworkMap()
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, fileNodeInfoAndSigned)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(1)).addNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfo.serialize().hash)
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfoAndSigned.nodeInfo.serialize().hash)
}
@Test
@ -275,7 +276,7 @@ class NetworkMapUpdaterTest {
}
}
private fun createNodeInfoAndSigned(org: String): Pair<NodeInfo, SignedNodeInfo> {
private fun createNodeInfoAndSigned(org: String): NodeInfoAndSigned {
return createNodeInfoAndSigned(CordaX500Name(org, "London", "GB"))
}
}