mirror of
https://github.com/corda/corda.git
synced 2025-06-21 08:40:03 +00:00
CORDA-833: SignedNodeInfo object for holding a list of signatures, one for each identity in the NodeInfo. This forms part of the network map.
This commit is contained in:
@ -3,14 +3,15 @@ package net.corda.node.services.network
|
||||
import com.google.common.jimfs.Configuration
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.nodeapi.internal.NodeInfoFilesCopier
|
||||
import net.corda.testing.*
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.testing.ALICE_NAME
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import net.corda.testing.node.MockKeyManagementService
|
||||
import net.corda.testing.node.makeTestIdentityService
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
@ -27,20 +28,20 @@ import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class NodeInfoWatcherTest {
|
||||
private companion object {
|
||||
val alice = TestIdentity(ALICE_NAME, 70)
|
||||
val nodeInfo = NodeInfo(listOf(), listOf(alice.identity), 0, 0)
|
||||
}
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val tempFolder = TemporaryFolder()
|
||||
private lateinit var nodeInfoPath: Path
|
||||
|
||||
private val scheduler = TestScheduler()
|
||||
private val testSubscriber = TestSubscriber<NodeInfo>()
|
||||
|
||||
private lateinit var nodeInfo: NodeInfo
|
||||
private lateinit var signedNodeInfo: SignedNodeInfo
|
||||
private lateinit var nodeInfoPath: Path
|
||||
private lateinit var keyManagementService: KeyManagementService
|
||||
|
||||
// Object under test
|
||||
@ -48,8 +49,11 @@ class NodeInfoWatcherTest {
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
val nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME)
|
||||
nodeInfo = nodeInfoAndSigned.first
|
||||
signedNodeInfo = nodeInfoAndSigned.second
|
||||
val identityService = makeTestIdentityService()
|
||||
keyManagementService = MockKeyManagementService(identityService, alice.keyPair)
|
||||
keyManagementService = MockKeyManagementService(identityService)
|
||||
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler)
|
||||
nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
|
||||
}
|
||||
@ -58,7 +62,6 @@ class NodeInfoWatcherTest {
|
||||
fun `save a NodeInfo`() {
|
||||
assertEquals(0,
|
||||
tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
|
||||
val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey))
|
||||
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), signedNodeInfo)
|
||||
|
||||
val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
|
||||
@ -74,7 +77,6 @@ class NodeInfoWatcherTest {
|
||||
fun `save a NodeInfo to JimFs`() {
|
||||
val jimFs = Jimfs.newFileSystem(Configuration.unix())
|
||||
val jimFolder = jimFs.getPath("/nodeInfo")
|
||||
val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey))
|
||||
NodeInfoWatcher.saveToFile(jimFolder, signedNodeInfo)
|
||||
}
|
||||
|
||||
@ -82,11 +84,9 @@ class NodeInfoWatcherTest {
|
||||
fun `load an empty Directory`() {
|
||||
nodeInfoPath.createDirectories()
|
||||
|
||||
val subscription = nodeInfoWatcher.nodeInfoUpdates()
|
||||
.subscribe(testSubscriber)
|
||||
val subscription = nodeInfoWatcher.nodeInfoUpdates().subscribe(testSubscriber)
|
||||
try {
|
||||
advanceTime()
|
||||
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
assertEquals(0, readNodes.size)
|
||||
} finally {
|
||||
@ -96,15 +96,13 @@ class NodeInfoWatcherTest {
|
||||
|
||||
@Test
|
||||
fun `load a non empty Directory`() {
|
||||
createNodeInfoFileInPath(nodeInfo)
|
||||
createNodeInfoFileInPath()
|
||||
|
||||
val subscription = nodeInfoWatcher.nodeInfoUpdates()
|
||||
.subscribe(testSubscriber)
|
||||
val subscription = nodeInfoWatcher.nodeInfoUpdates().subscribe(testSubscriber)
|
||||
advanceTime()
|
||||
|
||||
try {
|
||||
val readNodes = testSubscriber.onNextEvents.distinct()
|
||||
|
||||
assertEquals(1, readNodes.size)
|
||||
assertEquals(nodeInfo, readNodes.first())
|
||||
} finally {
|
||||
@ -117,14 +115,13 @@ class NodeInfoWatcherTest {
|
||||
nodeInfoPath.createDirectories()
|
||||
|
||||
// Start polling with an empty folder.
|
||||
val subscription = nodeInfoWatcher.nodeInfoUpdates()
|
||||
.subscribe(testSubscriber)
|
||||
val subscription = nodeInfoWatcher.nodeInfoUpdates().subscribe(testSubscriber)
|
||||
try {
|
||||
// Ensure the watch service is started.
|
||||
advanceTime()
|
||||
// Check no nodeInfos are read.
|
||||
assertEquals(0, testSubscriber.valueCount)
|
||||
createNodeInfoFileInPath(nodeInfo)
|
||||
createNodeInfoFileInPath()
|
||||
|
||||
advanceTime()
|
||||
|
||||
@ -143,8 +140,7 @@ class NodeInfoWatcherTest {
|
||||
}
|
||||
|
||||
// Write a nodeInfo under the right path.
|
||||
private fun createNodeInfoFileInPath(nodeInfo: NodeInfo) {
|
||||
val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey))
|
||||
private fun createNodeInfoFileInPath() {
|
||||
NodeInfoWatcher.saveToFile(nodeInfoPath, signedNodeInfo)
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,8 @@ import net.corda.confidential.SwapIdentitiesHandler
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.flows.*
|
||||
@ -32,10 +34,10 @@ import net.corda.node.internal.classloading.requireAnnotation
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||
import net.corda.node.internal.cordapp.CordappProviderInternal
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.ContractUpgradeHandler
|
||||
import net.corda.node.services.FinalityHandler
|
||||
import net.corda.node.services.NotaryChangeHandler
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.api.*
|
||||
import net.corda.node.services.config.BFTSMaRtConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
@ -59,6 +61,7 @@ import net.corda.node.shell.InteractiveShell
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.NetworkParameters
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -173,6 +176,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
validateKeystore()
|
||||
}
|
||||
|
||||
private inline fun signNodeInfo(nodeInfo: NodeInfo, sign: (PublicKey, SerializedBytes<NodeInfo>) -> DigitalSignature): SignedNodeInfo {
|
||||
// For now we assume the node has only one identity (excluding any composite ones)
|
||||
val owningKey = nodeInfo.legalIdentities.single { it.owningKey !is CompositeKey }.owningKey
|
||||
val serialised = nodeInfo.serialize()
|
||||
val signature = sign(owningKey, serialised)
|
||||
return SignedNodeInfo(serialised, listOf(signature))
|
||||
}
|
||||
|
||||
open fun generateNodeInfo() {
|
||||
check(started == null) { "Node has already been started" }
|
||||
log.info("Generating nodeInfo ...")
|
||||
@ -184,11 +195,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// a code smell.
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
val (keyPairs, info) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val identityKeypair = keyPairs.first { it.public == info.legalIdentities.first().owningKey }
|
||||
val serialisedNodeInfo = info.serialize()
|
||||
val signature = identityKeypair.sign(serialisedNodeInfo)
|
||||
// TODO: Signed data might not be sufficient for multiple identities, as it only contains one signature.
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, SignedData(serialisedNodeInfo, signature))
|
||||
val signedNodeInfo = signNodeInfo(info) { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,9 +258,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
networkMapUpdater.updateNodeInfo(services.myInfo) {
|
||||
val serialisedNodeInfo = it.serialize()
|
||||
val signature = services.keyManagementService.sign(serialisedNodeInfo.bytes, it.legalIdentities.first().owningKey)
|
||||
SignedData(serialisedNodeInfo, signature)
|
||||
signNodeInfo(it) { publicKey, serialised ->
|
||||
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
|
||||
}
|
||||
}
|
||||
networkMapUpdater.subscribeToNetworkMap()
|
||||
|
||||
|
@ -15,7 +15,7 @@ import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.nodeapi.internal.NetworkMap
|
||||
import net.corda.nodeapi.internal.NetworkParameters
|
||||
import net.corda.nodeapi.internal.SignedNetworkMap
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import okhttp3.CacheControl
|
||||
import okhttp3.Headers
|
||||
import rx.Subscription
|
||||
@ -31,13 +31,13 @@ import java.util.concurrent.TimeUnit
|
||||
class NetworkMapClient(compatibilityZoneURL: URL, private val trustedRoot: X509Certificate) {
|
||||
private val networkMapUrl = URL("$compatibilityZoneURL/network-map")
|
||||
|
||||
fun publish(signedNodeInfo: SignedData<NodeInfo>) {
|
||||
fun publish(signedNodeInfo: SignedNodeInfo) {
|
||||
val publishURL = URL("$networkMapUrl/publish")
|
||||
val conn = publishURL.openHttpConnection()
|
||||
conn.doOutput = true
|
||||
conn.requestMethod = "POST"
|
||||
conn.setRequestProperty("Content-Type", "application/octet-stream")
|
||||
conn.outputStream.use { it.write(signedNodeInfo.serialize().bytes) }
|
||||
conn.outputStream.use { signedNodeInfo.serialize().open().copyTo(it) }
|
||||
|
||||
// This will throw IOException if the response code is not HTTP 200.
|
||||
// This gives a much better exception then reading the error stream.
|
||||
@ -57,12 +57,8 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val trustedRoot: X509C
|
||||
return if (conn.responseCode == HttpURLConnection.HTTP_NOT_FOUND) {
|
||||
null
|
||||
} else {
|
||||
val signedNodeInfo = conn.inputStream.use { it.readBytes() }.deserialize<SignedData<NodeInfo>>()
|
||||
val nodeInfo = signedNodeInfo.verified()
|
||||
// Verify node info is signed by node identity
|
||||
// TODO : Validate multiple signatures when NodeInfo supports multiple identities.
|
||||
require(nodeInfo.legalIdentities.any { it.owningKey == signedNodeInfo.sig.by }) { "NodeInfo must be signed by the node owning key." }
|
||||
nodeInfo
|
||||
val signedNodeInfo = conn.inputStream.use { it.readBytes() }.deserialize<SignedNodeInfo>()
|
||||
signedNodeInfo.verified()
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,7 +96,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
fun updateNodeInfo(newInfo: NodeInfo, signNodeInfo: (NodeInfo) -> SignedData<NodeInfo>) {
|
||||
fun updateNodeInfo(newInfo: NodeInfo, signNodeInfo: (NodeInfo) -> SignedNodeInfo) {
|
||||
val oldInfo = networkMapCache.getNodeByLegalIdentity(newInfo.legalIdentities.first())
|
||||
// Compare node info without timestamp.
|
||||
if (newInfo.copy(serial = 0L) == oldInfo?.copy(serial = 0L)) return
|
||||
@ -138,9 +134,9 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
// Download new node info from network map
|
||||
try {
|
||||
networkMapClient.getNodeInfo(it)
|
||||
} catch (t: Throwable) {
|
||||
} catch (e: Exception) {
|
||||
// Failure to retrieve one node info shouldn't stop the whole update, log and return null instead.
|
||||
logger.warn("Error encountered when downloading node info '$it', skipping...", t)
|
||||
logger.warn("Error encountered when downloading node info '$it', skipping...", e)
|
||||
null
|
||||
}
|
||||
}.forEach {
|
||||
@ -163,7 +159,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
executor.submit(task) // The check may be expensive, so always run it in the background even the first time.
|
||||
}
|
||||
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedData<NodeInfo>, networkMapClient: NetworkMapClient) {
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
val task = object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
|
@ -2,7 +2,6 @@ package net.corda.node.services.network
|
||||
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
@ -10,6 +9,7 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.NodeInfoFilesCopier
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import rx.Observable
|
||||
import rx.Scheduler
|
||||
import java.io.IOException
|
||||
@ -41,14 +41,14 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
private val logger = contextLogger()
|
||||
/**
|
||||
* Saves the given [NodeInfo] to a path.
|
||||
* The node is 'encoded' as a SignedData<NodeInfo>, signed with the owning key of its first identity.
|
||||
* The node is 'encoded' as a SignedNodeInfo, signed with the owning key of its first identity.
|
||||
* The name of the written file will be "nodeInfo-" followed by the hash of the content. The hash in the filename
|
||||
* is used so that one can freely copy these files without fearing to overwrite another one.
|
||||
*
|
||||
* @param path the path where to write the file, if non-existent it will be created.
|
||||
* @param signedNodeInfo the signed NodeInfo.
|
||||
*/
|
||||
fun saveToFile(path: Path, signedNodeInfo: SignedData<NodeInfo>) {
|
||||
fun saveToFile(path: Path, signedNodeInfo: SignedNodeInfo) {
|
||||
try {
|
||||
path.createDirectories()
|
||||
signedNodeInfo.serialize()
|
||||
@ -85,7 +85,7 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
.flatMapIterable { loadFromDirectory() }
|
||||
}
|
||||
|
||||
fun saveToFile(signedNodeInfo: SignedData<NodeInfo>) = Companion.saveToFile(nodePath, signedNodeInfo)
|
||||
fun saveToFile(signedNodeInfo: SignedNodeInfo) = Companion.saveToFile(nodePath, signedNodeInfo)
|
||||
|
||||
/**
|
||||
* Loads all the files contained in a given path and returns the deserialized [NodeInfo]s.
|
||||
@ -118,7 +118,7 @@ class NodeInfoWatcher(private val nodePath: Path,
|
||||
private fun processFile(file: Path): NodeInfo? {
|
||||
return try {
|
||||
logger.info("Reading NodeInfo from file: $file")
|
||||
val signedData = file.readAll().deserialize<SignedData<NodeInfo>>()
|
||||
val signedData = file.readAll().deserialize<SignedNodeInfo>()
|
||||
signedData.verified()
|
||||
} catch (e: Exception) {
|
||||
logger.warn("Exception parsing NodeInfo from file. $file", e)
|
||||
|
@ -5,10 +5,12 @@ import net.corda.core.crypto.sha256
|
||||
import net.corda.core.internal.cert
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.network.TestNodeInfoFactory.createNodeInfo
|
||||
import net.corda.testing.ALICE_NAME
|
||||
import net.corda.testing.BOB_NAME
|
||||
import net.corda.testing.DEV_TRUST_ROOT
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import net.corda.testing.node.internal.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
@ -23,13 +25,12 @@ class NetworkMapClientTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
|
||||
private val cacheTimeout = 100000.seconds
|
||||
|
||||
private lateinit var server: NetworkMapServer
|
||||
private lateinit var networkMapClient: NetworkMapClient
|
||||
|
||||
companion object {
|
||||
private val cacheTimeout = 100000.seconds
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
server = NetworkMapServer(cacheTimeout, PortAllocation.Incremental(10000).nextHostAndPort())
|
||||
@ -44,9 +45,7 @@ class NetworkMapClientTest {
|
||||
|
||||
@Test
|
||||
fun `registered node is added to the network map`() {
|
||||
// Create node info.
|
||||
val signedNodeInfo = createNodeInfo("Test1")
|
||||
val nodeInfo = signedNodeInfo.verified()
|
||||
val (nodeInfo, signedNodeInfo) = createNodeInfoAndSigned(ALICE_NAME)
|
||||
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
|
||||
@ -55,8 +54,8 @@ class NetworkMapClientTest {
|
||||
assertThat(networkMapClient.getNetworkMap().networkMap.nodeInfoHashes).containsExactly(nodeInfoHash)
|
||||
assertEquals(nodeInfo, networkMapClient.getNodeInfo(nodeInfoHash))
|
||||
|
||||
val signedNodeInfo2 = createNodeInfo("Test2")
|
||||
val nodeInfo2 = signedNodeInfo2.verified()
|
||||
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned(BOB_NAME)
|
||||
|
||||
networkMapClient.publish(signedNodeInfo2)
|
||||
|
||||
val nodeInfoHash2 = nodeInfo2.serialize().sha256()
|
||||
|
@ -1,69 +1,70 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import com.google.common.jimfs.Configuration
|
||||
import com.google.common.jimfs.Configuration.unix
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.times
|
||||
import com.nhaarman.mockito_kotlin.verify
|
||||
import net.corda.cordform.CordformNode
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.nodeapi.internal.NetworkMap
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.NetworkMap
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.testing.ALICE_NAME
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.internal.TestNodeInfoBuilder
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import rx.schedulers.TestScheduler
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NetworkMapUpdaterTest {
|
||||
companion object {
|
||||
val NETWORK_PARAMS_HASH = SecureHash.randomSHA256()
|
||||
}
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
private val jimFs = Jimfs.newFileSystem(Configuration.unix())
|
||||
private val baseDir = jimFs.getPath("/node")
|
||||
|
||||
private val fs = Jimfs.newFileSystem(unix())
|
||||
private val baseDir = fs.getPath("/node")
|
||||
private val networkMapCache = createMockNetworkMapCache()
|
||||
private val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedNodeInfo>()
|
||||
private val cacheExpiryMs = 100
|
||||
private val networkMapClient = createMockNetworkMapClient()
|
||||
private val scheduler = TestScheduler()
|
||||
private val networkParametersHash = SecureHash.randomSHA256()
|
||||
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash)
|
||||
private val nodeInfoBuilder = TestNodeInfoBuilder()
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
updater.close()
|
||||
fs.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `publish node info`() {
|
||||
val keyPair = Crypto.generateKeyPair()
|
||||
nodeInfoBuilder.addIdentity(ALICE_NAME)
|
||||
|
||||
val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1").verified()
|
||||
val signedNodeInfo = TestNodeInfoFactory.sign(keyPair, nodeInfo1)
|
||||
|
||||
val sameNodeInfoDifferentTime = nodeInfo1.copy(serial = System.currentTimeMillis())
|
||||
val signedSameNodeInfoDifferentTime = TestNodeInfoFactory.sign(keyPair, sameNodeInfoDifferentTime)
|
||||
|
||||
val differentNodeInfo = nodeInfo1.copy(addresses = listOf(NetworkHostAndPort("my.new.host.com", 1000)))
|
||||
val signedDifferentNodeInfo = TestNodeInfoFactory.sign(keyPair, differentNodeInfo)
|
||||
|
||||
val networkMapCache = getMockNetworkMapCache()
|
||||
|
||||
val networkMapClient = mock<NetworkMapClient>()
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, NETWORK_PARAMS_HASH)
|
||||
val (nodeInfo1, signedNodeInfo1) = nodeInfoBuilder.buildWithSigned()
|
||||
val (sameNodeInfoDifferentTime, signedSameNodeInfoDifferentTime) = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
|
||||
|
||||
// Publish node info for the first time.
|
||||
updater.updateNodeInfo(nodeInfo1) { signedNodeInfo }
|
||||
updater.updateNodeInfo(nodeInfo1) { signedNodeInfo1 }
|
||||
// Sleep as publish is asynchronous.
|
||||
// TODO: Remove sleep in unit test
|
||||
Thread.sleep(200)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
verify(networkMapClient, times(1)).publish(any())
|
||||
|
||||
networkMapCache.addNode(nodeInfo1)
|
||||
@ -71,167 +72,144 @@ class NetworkMapUpdaterTest {
|
||||
// Publish the same node info, but with different serial.
|
||||
updater.updateNodeInfo(sameNodeInfoDifferentTime) { signedSameNodeInfoDifferentTime }
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// Same node info should not publish twice
|
||||
verify(networkMapClient, times(0)).publish(signedSameNodeInfoDifferentTime)
|
||||
|
||||
val (differentNodeInfo, signedDifferentNodeInfo) = createNodeInfoAndSigned("Bob")
|
||||
|
||||
// Publish different node info.
|
||||
updater.updateNodeInfo(differentNodeInfo) { signedDifferentNodeInfo }
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
verify(networkMapClient, times(1)).publish(signedDifferentNodeInfo)
|
||||
|
||||
updater.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `process add node updates from network map, with additional node infos from dir`() {
|
||||
val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1")
|
||||
val nodeInfo2 = TestNodeInfoFactory.createNodeInfo("Info 2")
|
||||
val nodeInfo3 = TestNodeInfoFactory.createNodeInfo("Info 3")
|
||||
val nodeInfo4 = TestNodeInfoFactory.createNodeInfo("Info 4")
|
||||
val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file")
|
||||
val networkMapCache = getMockNetworkMapCache()
|
||||
|
||||
val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedData<NodeInfo>>()
|
||||
val networkMapClient = mock<NetworkMapClient> {
|
||||
on { publish(any()) }.then {
|
||||
val signedNodeInfo: SignedData<NodeInfo> = uncheckedCast(it.arguments.first())
|
||||
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
|
||||
}
|
||||
on { getNetworkMap() }.then { NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), NETWORK_PARAMS_HASH), 100.millis) }
|
||||
on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() }
|
||||
}
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, NETWORK_PARAMS_HASH)
|
||||
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")
|
||||
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("Info 2")
|
||||
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3")
|
||||
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
|
||||
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Test adding new node.
|
||||
networkMapClient.publish(nodeInfo1)
|
||||
networkMapClient.publish(signedNodeInfo1)
|
||||
// Not subscribed yet.
|
||||
verify(networkMapCache, times(0)).addNode(any())
|
||||
|
||||
updater.subscribeToNetworkMap()
|
||||
networkMapClient.publish(nodeInfo2)
|
||||
networkMapClient.publish(signedNodeInfo2)
|
||||
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
verify(networkMapCache, times(2)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo1.verified())
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo2.verified())
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo1)
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo2)
|
||||
|
||||
NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo)
|
||||
networkMapClient.publish(nodeInfo3)
|
||||
networkMapClient.publish(nodeInfo4)
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
|
||||
networkMapClient.publish(signedNodeInfo3)
|
||||
networkMapClient.publish(signedNodeInfo4)
|
||||
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// 4 node info from network map, and 1 from file.
|
||||
verify(networkMapCache, times(5)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo3.verified())
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo4.verified())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified())
|
||||
|
||||
updater.close()
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo3)
|
||||
verify(networkMapCache, times(1)).addNode(nodeInfo4)
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `process remove node updates from network map, with additional node infos from dir`() {
|
||||
val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1")
|
||||
val nodeInfo2 = TestNodeInfoFactory.createNodeInfo("Info 2")
|
||||
val nodeInfo3 = TestNodeInfoFactory.createNodeInfo("Info 3")
|
||||
val nodeInfo4 = TestNodeInfoFactory.createNodeInfo("Info 4")
|
||||
val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file")
|
||||
val networkMapCache = getMockNetworkMapCache()
|
||||
|
||||
val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedData<NodeInfo>>()
|
||||
val networkMapClient = mock<NetworkMapClient> {
|
||||
on { publish(any()) }.then {
|
||||
val signedNodeInfo: SignedData<NodeInfo> = uncheckedCast(it.arguments.first())
|
||||
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
|
||||
}
|
||||
on { getNetworkMap() }.then { NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), NETWORK_PARAMS_HASH), 100.millis) }
|
||||
on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() }
|
||||
}
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, NETWORK_PARAMS_HASH)
|
||||
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")
|
||||
val (nodeInfo2, signedNodeInfo2) = createNodeInfoAndSigned("Info 2")
|
||||
val (nodeInfo3, signedNodeInfo3) = createNodeInfoAndSigned("Info 3")
|
||||
val (nodeInfo4, signedNodeInfo4) = createNodeInfoAndSigned("Info 4")
|
||||
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Add all nodes.
|
||||
NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo)
|
||||
networkMapClient.publish(nodeInfo1)
|
||||
networkMapClient.publish(nodeInfo2)
|
||||
networkMapClient.publish(nodeInfo3)
|
||||
networkMapClient.publish(nodeInfo4)
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
|
||||
networkMapClient.publish(signedNodeInfo1)
|
||||
networkMapClient.publish(signedNodeInfo2)
|
||||
networkMapClient.publish(signedNodeInfo3)
|
||||
networkMapClient.publish(signedNodeInfo4)
|
||||
|
||||
updater.subscribeToNetworkMap()
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// 4 node info from network map, and 1 from file.
|
||||
assertEquals(4, nodeInfoMap.size)
|
||||
assertThat(nodeInfoMap).hasSize(4)
|
||||
verify(networkMapCache, times(5)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
|
||||
|
||||
// Test remove node.
|
||||
nodeInfoMap.clear()
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
verify(networkMapCache, times(4)).removeNode(any())
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo1.verified())
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo2.verified())
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo3.verified())
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo4.verified())
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo1)
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo2)
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo3)
|
||||
verify(networkMapCache, times(1)).removeNode(nodeInfo4)
|
||||
|
||||
// Node info from file should not be deleted
|
||||
assertEquals(1, networkMapCache.allNodeHashes.size)
|
||||
assertEquals(fileNodeInfo.verified().serialize().hash, networkMapCache.allNodeHashes.first())
|
||||
|
||||
updater.close()
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfo.serialize().hash)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `receive node infos from directory, without a network map`() {
|
||||
val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file")
|
||||
|
||||
val networkMapCache = getMockNetworkMapCache()
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null, NETWORK_PARAMS_HASH)
|
||||
val (fileNodeInfo, signedFileNodeInfo) = createNodeInfoAndSigned("Info from file")
|
||||
|
||||
// Not subscribed yet.
|
||||
verify(networkMapCache, times(0)).addNode(any())
|
||||
|
||||
updater.subscribeToNetworkMap()
|
||||
|
||||
NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo)
|
||||
NodeInfoWatcher.saveToFile(baseDir / NODE_INFO_DIRECTORY, signedFileNodeInfo)
|
||||
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
|
||||
|
||||
verify(networkMapCache, times(1)).addNode(any())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified())
|
||||
verify(networkMapCache, times(1)).addNode(fileNodeInfo)
|
||||
|
||||
assertEquals(1, networkMapCache.allNodeHashes.size)
|
||||
assertEquals(fileNodeInfo.verified().serialize().hash, networkMapCache.allNodeHashes.first())
|
||||
|
||||
updater.close()
|
||||
assertThat(networkMapCache.allNodeHashes).containsOnly(fileNodeInfo.serialize().hash)
|
||||
}
|
||||
|
||||
private fun getMockNetworkMapCache() = mock<NetworkMapCacheInternal> {
|
||||
val data = ConcurrentHashMap<Party, NodeInfo>()
|
||||
on { addNode(any()) }.then {
|
||||
val nodeInfo = it.arguments.first() as NodeInfo
|
||||
data.put(nodeInfo.legalIdentities.first(), nodeInfo)
|
||||
private fun createMockNetworkMapClient(): NetworkMapClient {
|
||||
return mock {
|
||||
on { publish(any()) }.then {
|
||||
val signedNodeInfo: SignedNodeInfo = uncheckedCast(it.arguments[0])
|
||||
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
|
||||
}
|
||||
on { getNetworkMap() }.then {
|
||||
NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), networkParametersHash), cacheExpiryMs.millis)
|
||||
}
|
||||
on { getNodeInfo(any()) }.then {
|
||||
nodeInfoMap[it.arguments[0]]?.verified()
|
||||
}
|
||||
}
|
||||
on { removeNode(any()) }.then { data.remove((it.arguments.first() as NodeInfo).legalIdentities.first()) }
|
||||
on { getNodeByLegalIdentity(any()) }.then { data[it.arguments.first()] }
|
||||
on { allNodeHashes }.then { data.values.map { it.serialize().hash } }
|
||||
on { getNodeByHash(any()) }.then { mock -> data.values.single { it.serialize().hash == mock.arguments.first() } }
|
||||
}
|
||||
|
||||
private fun createMockNetworkMapCache(): NetworkMapCacheInternal {
|
||||
return mock {
|
||||
val data = ConcurrentHashMap<Party, NodeInfo>()
|
||||
on { addNode(any()) }.then {
|
||||
val nodeInfo = it.arguments[0] as NodeInfo
|
||||
data.put(nodeInfo.legalIdentities[0], nodeInfo)
|
||||
}
|
||||
on { removeNode(any()) }.then { data.remove((it.arguments[0] as NodeInfo).legalIdentities[0]) }
|
||||
on { getNodeByLegalIdentity(any()) }.then { data[it.arguments[0]] }
|
||||
on { allNodeHashes }.then { data.values.map { it.serialize().hash } }
|
||||
on { getNodeByHash(any()) }.then { mock -> data.values.single { it.serialize().hash == mock.arguments[0] } }
|
||||
}
|
||||
}
|
||||
|
||||
private fun createNodeInfoAndSigned(org: String): Pair<NodeInfo, SignedNodeInfo> {
|
||||
return createNodeInfoAndSigned(CordaX500Name(org, "London", "GB"))
|
||||
}
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.bouncycastle.cert.X509CertificateHolder
|
||||
import java.security.KeyPair
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.Certificate
|
||||
import java.security.cert.X509Certificate
|
||||
|
||||
object TestNodeInfoFactory {
|
||||
private val rootCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
private val rootCACert = X509Utilities.createSelfSignedCACertificate(CordaX500Name(commonName = "Corda Node Root CA", organisation = "R3 LTD", locality = "London", country = "GB"), rootCAKey)
|
||||
private val intermediateCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
private val intermediateCACert = X509Utilities.createCertificate(CertificateType.INTERMEDIATE_CA, rootCACert, rootCAKey, X500Name("CN=Corda Node Intermediate CA,L=London"), intermediateCAKey.public)
|
||||
|
||||
fun createNodeInfo(organisation: String): SignedData<NodeInfo> {
|
||||
val keyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
|
||||
val clientCert = X509Utilities.createCertificate(CertificateType.NODE_CA, intermediateCACert, intermediateCAKey, CordaX500Name(organisation = organisation, locality = "London", country = "GB"), keyPair.public)
|
||||
val certPath = buildCertPath(clientCert.toX509Certificate(), intermediateCACert.toX509Certificate(), rootCACert.toX509Certificate())
|
||||
val nodeInfo = NodeInfo(listOf(NetworkHostAndPort("my.$organisation.com", 1234)), listOf(PartyAndCertificate(certPath)), 1, serial = 1L)
|
||||
return sign(keyPair, nodeInfo)
|
||||
}
|
||||
|
||||
fun <T : Any> sign(keyPair: KeyPair, t: T): SignedData<T> {
|
||||
// Create digital signature.
|
||||
val digitalSignature = DigitalSignature.WithKey(keyPair.public, Crypto.doSign(keyPair.private, t.serialize().bytes))
|
||||
return SignedData(t.serialize(), digitalSignature)
|
||||
}
|
||||
|
||||
private fun buildCertPath(vararg certificates: Certificate): CertPath {
|
||||
return X509CertificateFactory().generateCertPath(*certificates)
|
||||
}
|
||||
|
||||
private fun X509CertificateHolder.toX509Certificate(): X509Certificate {
|
||||
return X509CertificateFactory().generateCertificate(encoded.inputStream())
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user