diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 0012e64902..c996b4254b 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -11,7 +11,8 @@ UNRELEASED * Cordform may not specify a value for ``NetworkMap``, when that happens, during the task execution the following happens: 1. Each node is started and its signed serialized NodeInfo is written to disk in the node base directory. 2. Every serialized ``NodeInfo`` above is copied in every other node "additional-node-info" folder under the NodeInfo folder. - * Nodes read all the nodes stored in ``additional-node-info`` when the ``NetworkMapService`` starts up. + +* Nodes read and poll the filesystem for serialized ``NodeInfo`` in the ``additional-node-info`` directory. * ``Cordapp`` now has a name field for identifying CorDapps and all CorDapp names are printed to console at startup. diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 3472db31b4..d8146ff1fe 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -170,7 +170,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } private fun saveOwnNodeInfo() { - NodeInfoSerializer().saveToFile(configuration.baseDirectory, info, services.keyManagementService) + NodeInfoWatcher.saveToFile(configuration.baseDirectory, info, services.keyManagementService) } private fun initCertificate() { @@ -409,7 +409,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService, services, cordappProvider, this) makeNetworkServices(tokenizableServices) - return tokenizableServices } diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoSerializer.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoSerializer.kt deleted file mode 100644 index 925f393b7a..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoSerializer.kt +++ /dev/null @@ -1,85 +0,0 @@ -package net.corda.node.services.network - -import net.corda.cordform.CordformNode -import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.SignedData -import net.corda.core.internal.createDirectories -import net.corda.core.internal.div -import net.corda.core.internal.isDirectory -import net.corda.core.node.NodeInfo -import net.corda.core.node.services.KeyManagementService -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize -import net.corda.core.utilities.ByteSequence -import net.corda.core.utilities.loggerFor -import java.io.File -import java.nio.file.Files -import java.nio.file.Path - -/** - * Class containing the logic to serialize and de-serialize a [NodeInfo] to disk and reading it back. - */ -class NodeInfoSerializer { - - companion object { - val logger = loggerFor() - } - - /** - * Saves the given [NodeInfo] to a path. - * The node is 'encoded' as a SignedData, signed with the owning key of its first identity. - * The name of the written file will be "nodeInfo-" followed by the hash of the content. The hash in the filename - * is used so that one can freely copy these files without fearing to overwrite another one. - * - * @param path the path where to write the file, if non-existent it will be created. - * @param nodeInfo the NodeInfo to serialize. - * @param keyManager a KeyManagementService used to sign the NodeInfo data. - */ - fun saveToFile(path: Path, nodeInfo: NodeInfo, keyManager: KeyManagementService) { - try { - path.createDirectories() - val serializedBytes = nodeInfo.serialize() - val regSig = keyManager.sign(serializedBytes.bytes, nodeInfo.legalIdentities.first().owningKey) - val signedData = SignedData(serializedBytes, regSig) - val file = (path / ("nodeInfo-" + SecureHash.sha256(serializedBytes.bytes).toString())).toFile() - file.writeBytes(signedData.serialize().bytes) - } catch (e : Exception) { - logger.warn("Couldn't write node info to file: $e") - } - } - - /** - * Loads all the files contained in a given path and returns the deserialized [NodeInfo]s. - * Signatures are checked before returning a value. - * - * @param nodePath the node base path. NodeInfo files are searched for in nodePath/[NODE_INFO_FOLDER] - * @return a list of [NodeInfo]s - */ - fun loadFromDirectory(nodePath: Path): List { - val result = mutableListOf() - val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY - if (!nodeInfoDirectory.isDirectory()) { - logger.info("$nodeInfoDirectory isn't a Directory, not loading NodeInfo from files") - return result - } - for (path in Files.list(nodeInfoDirectory)) { - val file = path.toFile() - if (file.isFile) { - try { - logger.info("Reading NodeInfo from file: $file") - val nodeInfo = loadFromFile(file) - result.add(nodeInfo) - } catch (e: Exception) { - logger.error("Exception parsing NodeInfo from file. $file" , e) - } - } - } - logger.info("Succesfully read ${result.size} NodeInfo files.") - return result - } - - private fun loadFromFile(file: File): NodeInfo { - val signedData = ByteSequence.of(file.readBytes()).deserialize>() - return signedData.verified() - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt new file mode 100644 index 0000000000..2d4db9e3fa --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt @@ -0,0 +1,150 @@ +package net.corda.node.services.network + +import net.corda.cordform.CordformNode +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignedData +import net.corda.core.internal.* +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.KeyManagementService +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.loggerFor +import rx.Observable +import rx.Scheduler +import rx.schedulers.Schedulers +import java.nio.file.Path +import java.nio.file.StandardWatchEventKinds +import java.nio.file.WatchEvent +import java.nio.file.WatchKey +import java.nio.file.WatchService +import java.util.concurrent.TimeUnit +import kotlin.streams.toList + +/** + * Class containing the logic to + * - Serialize and de-serialize a [NodeInfo] to disk and reading it back. + * - Poll a directory for new serialized [NodeInfo] + * + * @param path the base path of a node. + * @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for + * testing. It defaults to the io scheduler which is the appropriate value for production uses. + */ +class NodeInfoWatcher(private val nodePath: Path, + private val scheduler: Scheduler = Schedulers.io()) { + + private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY + private val watchService : WatchService? by lazy { initWatch() } + + companion object { + private val logger = loggerFor() + + /** + * Saves the given [NodeInfo] to a path. + * The node is 'encoded' as a SignedData, signed with the owning key of its first identity. + * The name of the written file will be "nodeInfo-" followed by the hash of the content. The hash in the filename + * is used so that one can freely copy these files without fearing to overwrite another one. + * + * @param path the path where to write the file, if non-existent it will be created. + * @param nodeInfo the NodeInfo to serialize. + * @param keyManager a KeyManagementService used to sign the NodeInfo data. + */ + fun saveToFile(path: Path, nodeInfo: NodeInfo, keyManager: KeyManagementService) { + try { + path.createDirectories() + val serializedBytes = nodeInfo.serialize() + val regSig = keyManager.sign(serializedBytes.bytes, + nodeInfo.legalIdentities.first().owningKey) + val signedData = SignedData(serializedBytes, regSig) + val file = (path / ("nodeInfo-" + SecureHash.sha256(serializedBytes.bytes).toString())).toFile() + file.writeBytes(signedData.serialize().bytes) + } catch (e: Exception) { + logger.warn("Couldn't write node info to file", e) + } + } + } + + /** + * Read all the files contained in [nodePath] / [CordformNode.NODE_INFO_DIRECTORY] and keep watching + * the folder for further updates. + * + * @return an [Observable] returning [NodeInfo]s, there is no guarantee that the same value isn't returned more + * than once. + */ + fun nodeInfoUpdates(): Observable { + val pollForFiles = Observable.interval(5, TimeUnit.SECONDS, scheduler) + .flatMapIterable { pollWatch() } + val readCurrentFiles = Observable.from(loadFromDirectory()) + return readCurrentFiles.mergeWith(pollForFiles) + } + + /** + * Loads all the files contained in a given path and returns the deserialized [NodeInfo]s. + * Signatures are checked before returning a value. + * + * @return a list of [NodeInfo]s + */ + private fun loadFromDirectory(): List { + val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY + if (!nodeInfoDirectory.isDirectory()) { + logger.info("$nodeInfoDirectory isn't a Directory, not loading NodeInfo from files") + return emptyList() + } + val result = nodeInfoDirectory.list { paths -> + paths.filter { it.isRegularFile() } + .map { processFile(it) } + .toList() + .filterNotNull() + } + logger.info("Successfully read ${result.size} NodeInfo files.") + return result + } + + // Polls the watchService for changes to nodeInfoDirectory, return all the newly read NodeInfos. + private fun pollWatch(): List { + if (watchService == null) { + return emptyList() + } + val watchKey: WatchKey = watchService?.poll() ?: return emptyList() + val files = mutableSetOf() + for (event in watchKey.pollEvents()) { + val kind = event.kind() + if (kind == StandardWatchEventKinds.OVERFLOW) continue + + val ev: WatchEvent = uncheckedCast(event) + val filename = ev.context() + val absolutePath = nodeInfoDirectory.resolve(filename) + if (absolutePath.isRegularFile()) { + files.add(absolutePath) + } + } + val valid = watchKey.reset() + if (!valid) { + logger.warn("Can't poll $nodeInfoDirectory anymore, it was probably deleted.") + } + return files.mapNotNull { processFile(it) } + } + + private fun processFile(file: Path) : NodeInfo? { + try { + logger.info("Reading NodeInfo from file: $file") + val signedData = file.readAll().deserialize>() + return signedData.verified() + } catch (e: Exception) { + logger.warn("Exception parsing NodeInfo from file. $file", e) + return null + } + } + + // Create a WatchService watching for changes in nodeInfoDirectory. + private fun initWatch() : WatchService? { + if (!nodeInfoDirectory.isDirectory()) { + logger.warn("Not watching folder $nodeInfoDirectory it doesn't exist or it's not a directory") + return null + } + val watchService = nodeInfoDirectory.fileSystem.newWatchService() + nodeInfoDirectory.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_MODIFY) + logger.info("Watching $nodeInfoDirectory for new files") + return watchService + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index f2f5e8396c..2dac97909a 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -1,11 +1,11 @@ package net.corda.node.services.network import net.corda.core.concurrent.CordaFuture -import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed @@ -40,6 +40,7 @@ import java.security.PublicKey import java.security.SignatureException import java.util.* import javax.annotation.concurrent.ThreadSafe +import kotlin.collections.HashMap /** * Extremely simple in-memory cache of the network map. @@ -87,6 +88,8 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) .sortedBy { it.name.toString() } } + private val nodeInfoSerializer = NodeInfoWatcher(serviceHub.configuration.baseDirectory) + init { loadFromFiles() serviceHub.database.transaction { loadFromDB() } @@ -94,9 +97,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) private fun loadFromFiles() { logger.info("Loading network map from files..") - for (node in NodeInfoSerializer().loadFromDirectory(serviceHub.configuration.baseDirectory)) { - addNode(node) - } + nodeInfoSerializer.nodeInfoUpdates().subscribe { node -> addNode(node) } } override fun getPartyInfo(party: Party): PartyInfo? { diff --git a/node/src/test/kotlin/net/corda/node/services/network/NodeInfoSerializerTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NodeInfoSerializerTest.kt deleted file mode 100644 index 6fe8ea6a28..0000000000 --- a/node/src/test/kotlin/net/corda/node/services/network/NodeInfoSerializerTest.kt +++ /dev/null @@ -1,67 +0,0 @@ -package net.corda.node.services.network - -import net.corda.cordform.CordformNode -import net.corda.core.internal.div -import net.corda.core.node.NodeInfo -import net.corda.core.node.services.KeyManagementService -import net.corda.node.services.identity.InMemoryIdentityService -import net.corda.testing.* -import net.corda.testing.node.MockKeyManagementService -import net.corda.testing.node.NodeBasedTest -import org.junit.Before -import org.junit.Rule -import org.junit.Test -import org.junit.rules.TemporaryFolder -import java.nio.charset.Charset -import kotlin.test.assertEquals -import kotlin.test.assertTrue -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.contentOf - -class NodeInfoSerializerTest : NodeBasedTest() { - - @Rule @JvmField var folder = TemporaryFolder() - - lateinit var keyManagementService: KeyManagementService - - // Object under test - val nodeInfoSerializer = NodeInfoSerializer() - - companion object { - val nodeInfoFileRegex = Regex("nodeInfo\\-.*") - val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0) - } - - @Before - fun start() { - val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT) - keyManagementService = MockKeyManagementService(identityService, ALICE_KEY) - } - - @Test - fun `save a NodeInfo`() { - nodeInfoSerializer.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService) - - assertEquals(1, folder.root.list().size) - val fileName = folder.root.list()[0] - assertTrue(fileName.matches(nodeInfoFileRegex)) - val file = (folder.root.path / fileName).toFile() - // Just check that something is written, another tests verifies that the written value can be read back. - assertThat(contentOf(file)).isNotEmpty() - } - - @Test - fun `load an empty Directory`() { - assertEquals(0, nodeInfoSerializer.loadFromDirectory(folder.root.toPath()).size) - } - - @Test - fun `load a non empty Directory`() { - val nodeInfoFolder = folder.newFolder(CordformNode.NODE_INFO_DIRECTORY) - nodeInfoSerializer.saveToFile(nodeInfoFolder.toPath(), nodeInfo, keyManagementService) - val nodeInfos = nodeInfoSerializer.loadFromDirectory(folder.root.toPath()) - - assertEquals(1, nodeInfos.size) - assertEquals(nodeInfo, nodeInfos.first()) - } -} diff --git a/node/src/test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt new file mode 100644 index 0000000000..758374f5c9 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt @@ -0,0 +1,113 @@ +package net.corda.node.services.network + +import net.corda.cordform.CordformNode +import net.corda.core.internal.createDirectories +import net.corda.core.internal.div +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.KeyManagementService +import net.corda.node.services.identity.InMemoryIdentityService +import net.corda.testing.* +import net.corda.testing.node.MockKeyManagementService +import net.corda.testing.node.NodeBasedTest +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import rx.observers.TestSubscriber +import rx.schedulers.TestScheduler +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertTrue +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.contentOf +import java.nio.file.Path + +class NodeInfoWatcherTest : NodeBasedTest() { + + @Rule @JvmField var folder = TemporaryFolder() + + lateinit var keyManagementService: KeyManagementService + lateinit var nodeInfoPath: Path + val scheduler = TestScheduler(); + val testSubscriber = TestSubscriber() + + // Object under test + lateinit var nodeInfoWatcher: NodeInfoWatcher + + companion object { + val nodeInfoFileRegex = Regex("nodeInfo\\-.*") + val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0) + } + + @Before + fun start() { + val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT) + keyManagementService = MockKeyManagementService(identityService, ALICE_KEY) + nodeInfoWatcher = NodeInfoWatcher(folder.root.toPath(), scheduler) + nodeInfoPath = folder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY + } + + @Test + fun `save a NodeInfo`() { + NodeInfoWatcher.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService) + + assertEquals(1, folder.root.list().size) + val fileName = folder.root.list()[0] + assertTrue(fileName.matches(nodeInfoFileRegex)) + val file = (folder.root.path / fileName).toFile() + // Just check that something is written, another tests verifies that the written value can be read back. + assertThat(contentOf(file)).isNotEmpty() + } + + @Test + fun `load an empty Directory`() { + nodeInfoPath.createDirectories() + + nodeInfoWatcher.nodeInfoUpdates() + .subscribe(testSubscriber) + + val readNodes = testSubscriber.onNextEvents.distinct() + scheduler.advanceTimeBy(1, TimeUnit.HOURS) + assertEquals(0, readNodes.size) + } + + @Test + fun `load a non empty Directory`() { + createNodeInfoFileInPath(nodeInfo) + + nodeInfoWatcher.nodeInfoUpdates() + .subscribe(testSubscriber) + + val readNodes = testSubscriber.onNextEvents.distinct() + + assertEquals(1, readNodes.size) + assertEquals(nodeInfo, readNodes.first()) + } + + @Test + fun `polling folder`() { + nodeInfoPath.createDirectories() + + // Start polling with an empty folder. + nodeInfoWatcher.nodeInfoUpdates() + .subscribe(testSubscriber) + // Ensure the watch service is started. + scheduler.advanceTimeBy(1, TimeUnit.HOURS) + + // Check no nodeInfos are read. + assertEquals(0, testSubscriber.valueCount) + createNodeInfoFileInPath(nodeInfo) + + scheduler.advanceTimeBy(1, TimeUnit.HOURS) + + // The same folder can be reported more than once, so take unique values. + val readNodes = testSubscriber.onNextEvents.distinct() + assertEquals(1, readNodes.size) + assertEquals(nodeInfo, readNodes.first()) + } + + // Write a nodeInfo under the right path. + private fun createNodeInfoFileInPath(nodeInfo: NodeInfo) { + NodeInfoWatcher.saveToFile(nodeInfoPath, nodeInfo, keyManagementService) + } +}