Replace artemis network map with http network map (#1970)

* Network map cache using Network map client instead of artemis. -- WIP

* fix up after rebase

* address PR issues, split network map update test, added todos to remove sleeps

* move jimfs and baseDir to field variable
This commit is contained in:
Patrick Kuo
2017-11-14 11:37:50 +00:00
committed by GitHub
parent 3627cc9fc2
commit 64a9946f03
27 changed files with 585 additions and 154 deletions

View File

@ -3,14 +3,16 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import net.corda.cordform.CordformNode
import net.corda.core.crypto.SignedData
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.serialize
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.nodeapi.NodeInfoFilesCopier
import net.corda.testing.ALICE
import net.corda.testing.ALICE_KEY
import net.corda.testing.getTestPartyAndCertificate
import net.corda.testing.*
import net.corda.testing.node.MockKeyManagementService
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.contentOf
import org.junit.Before
@ -38,12 +40,15 @@ class NodeInfoWatcherTest {
private lateinit var nodeInfoPath: Path
private val scheduler = TestScheduler()
private val testSubscriber = TestSubscriber<NodeInfo>()
private lateinit var keyManagementService: KeyManagementService
// Object under test
private lateinit var nodeInfoWatcher: NodeInfoWatcher
@Before
fun start() {
val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT)
keyManagementService = MockKeyManagementService(identityService, ALICE_KEY)
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler)
nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
}
@ -52,7 +57,8 @@ class NodeInfoWatcherTest {
fun `save a NodeInfo`() {
assertEquals(0,
tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size)
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), nodeInfo, ALICE_KEY)
val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey))
NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), signedNodeInfo)
val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }
assertEquals(1, nodeInfoFiles.size)
@ -67,7 +73,8 @@ class NodeInfoWatcherTest {
fun `save a NodeInfo to JimFs`() {
val jimFs = Jimfs.newFileSystem(Configuration.unix())
val jimFolder = jimFs.getPath("/nodeInfo")
NodeInfoWatcher.saveToFile(jimFolder, nodeInfo, ALICE_KEY)
val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey))
NodeInfoWatcher.saveToFile(jimFolder, signedNodeInfo)
}
@Test
@ -136,6 +143,7 @@ class NodeInfoWatcherTest {
// Write a nodeInfo under the right path.
private fun createNodeInfoFileInPath(nodeInfo: NodeInfo) {
NodeInfoWatcher.saveToFile(nodeInfoPath, nodeInfo, ALICE_KEY)
val signedNodeInfo = SignedData(nodeInfo.serialize(), keyManagementService.sign(nodeInfo.serialize().bytes, nodeInfo.legalIdentities.first().owningKey))
NodeInfoWatcher.saveToFile(nodeInfoPath, signedNodeInfo)
}
}

View File

@ -4,6 +4,7 @@ import joptsimple.OptionParser
import joptsimple.util.EnumConverter
import net.corda.core.internal.div
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.parseAsNodeConfiguration
import org.slf4j.event.Level
import java.io.PrintStream
@ -69,6 +70,11 @@ data class CmdLineOptions(val baseDirectory: Path,
val noLocalShell: Boolean,
val sshdServer: Boolean,
val justGenerateNodeInfo: Boolean) {
fun loadConfig() = ConfigHelper
.loadConfig(baseDirectory, configFile).parseAsNodeConfiguration()
fun loadConfig(): NodeConfiguration {
val config = ConfigHelper.loadConfig(baseDirectory, configFile).parseAsNodeConfiguration()
if (isRegistration) {
requireNotNull(config.compatibilityZoneURL) { "Compatibility Zone Url must be provided in registration mode." }
}
return config
}
}

View File

@ -8,6 +8,7 @@ import net.corda.confidential.SwapIdentitiesHandler
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.sign
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
@ -18,10 +19,7 @@ import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.*
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
@ -44,9 +42,7 @@ import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.network.*
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
@ -72,6 +68,7 @@ import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.sql.Connection
import java.time.Clock
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
@ -135,6 +132,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val runOnStop = ArrayList<() -> Any?>()
protected lateinit var database: CordaPersistence
protected val _nodeReadyFuture = openFuture<Unit>()
protected val networkMapClient: NetworkMapClient? by lazy { configuration.compatibilityZoneURL?.let(::NetworkMapClient) }
/** Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database */
val nodeReadyFuture: CordaFuture<Unit>
@ -170,7 +169,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
check(started == null) { "Node has already been started" }
log.info("Generating nodeInfo ...")
initCertificate()
initNodeInfo()
val keyPairs = initNodeInfo()
val identityKeypair = keyPairs.first { it.public == info.legalIdentities.first().owningKey }
val serialisedNodeInfo = info.serialize()
val signature = identityKeypair.sign(serialisedNodeInfo)
// TODO: Signed data might not be sufficient for multiple identities, as it only contains one signature.
NodeInfoWatcher.saveToFile(configuration.baseDirectory, SignedData(serialisedNodeInfo, signature))
}
open fun start(): StartedNode<AbstractNode> {
@ -184,8 +188,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService) {
val transactionStorage = makeTransactionStorage()
val stateLoader = StateLoaderImpl(transactionStorage)
val services = makeServices(keyPairs, schemaService, transactionStorage, stateLoader)
val notaryService = makeNotaryService(services)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader)
val notaryService = makeNotaryService(nodeServices)
smm = makeStateMachineManager()
val flowStarter = FlowStarterImpl(serverThread, smm)
val schedulerService = NodeSchedulerService(
@ -208,7 +212,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
startMessagingService(rpcOps)
installCoreFlows()
val cordaServices = installCordaServices(flowStarter)
tokenizableServices = services + cordaServices + schedulerService
tokenizableServices = nodeServices + cordaServices + schedulerService
registerCordappFlows()
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
@ -216,6 +220,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
runOnStop += network::stop
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
val networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
networkMapClient)
runOnStop += networkMapUpdater::close
networkMapUpdater.updateNodeInfo(services.myInfo) {
val serialisedNodeInfo = it.serialize()
val signature = services.keyManagementService.sign(serialisedNodeInfo.bytes, it.legalIdentities.first().owningKey)
SignedData(serialisedNodeInfo, signature)
}
networkMapUpdater.subscribeToNetworkMap()
// If we successfully loaded network data from database, we set this future to Unit.
services.networkMapCache.addNode(info)
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
@ -245,16 +262,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
identity
}
}
info = NodeInfo(
myAddresses(),
setOf(identity, myNotaryIdentity).filterNotNull(),
versionInfo.platformVersion,
platformClock.instant().toEpochMilli()
)
NodeInfoWatcher.saveToFile(configuration.baseDirectory, info, identityKeyPair)
return keyPairs
}
@ -727,7 +740,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
NetworkMapCacheImpl(
PersistentNetworkMapCache(
this@AbstractNode.database,
this@AbstractNode.configuration,
networkParameters.notaries),
identityService)
}
@ -768,4 +780,4 @@ internal class FlowStarterImpl(private val serverThread: AffinityExecutor, priva
/**
* Thrown when a node is about to start and its network map cache doesn't contain any node.
*/
internal class NetworkMapCacheEmptyException: Exception()
internal class NetworkMapCacheEmptyException : Exception()

View File

@ -176,15 +176,21 @@ open class Node(configuration: NodeConfiguration,
* TODO this code used to rely on the networkmap node, we might want to look at a different solution.
*/
private fun tryDetectIfNotPublicHost(host: String): String? {
if (!AddressUtils.isPublic(host)) {
return if (!AddressUtils.isPublic(host)) {
val foundPublicIP = AddressUtils.tryDetectPublicIP()
if (foundPublicIP != null) {
if (foundPublicIP == null) {
val retrievedHostName = networkMapClient?.myPublicHostname()
if (retrievedHostName != null) {
log.info("Retrieved public IP from Network Map Service: $this. This will be used instead of the provided \"$host\" as the advertised address.")
}
retrievedHostName
} else {
log.info("Detected public IP: ${foundPublicIP.hostAddress}. This will be used instead of the provided \"$host\" as the advertised address.")
return foundPublicIP.hostAddress
foundPublicIP.hostAddress
}
} else {
null
}
return null
}
override fun startMessagingService(rpcOps: RPCOps) {

View File

@ -141,14 +141,15 @@ open class NodeStartup(val args: Array<String>) {
}
open protected fun maybeRegisterWithNetworkAndExit(cmdlineOptions: CmdLineOptions, conf: NodeConfiguration) {
if (!cmdlineOptions.isRegistration) return
val compatibilityZoneURL = conf.compatibilityZoneURL
if (!cmdlineOptions.isRegistration || compatibilityZoneURL == null) return
println()
println("******************************************************************")
println("* *")
println("* Registering as a new participant with Corda network *")
println("* *")
println("******************************************************************")
NetworkRegistrationHelper(conf, HTTPNetworkRegistrationService(conf.certificateSigningService)).buildKeystore()
NetworkRegistrationHelper(conf, HTTPNetworkRegistrationService(compatibilityZoneURL)).buildKeystore()
exitProcess(0)
}

View File

@ -30,6 +30,10 @@ import net.corda.node.utilities.CordaPersistence
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
val allNodeHashes: List<SecureHash>
fun getNodeByHash(nodeHash: SecureHash): NodeInfo?
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)

View File

@ -25,7 +25,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val rpcUsers: List<User>
val devMode: Boolean
val devModeOptions: DevModeOptions?
val certificateSigningService: URL
val compatibilityZoneURL: URL?
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
val verifierType: VerifierType
val messageRedeliveryDelaySeconds: Int
@ -89,7 +89,7 @@ data class NodeConfigurationImpl(
override val trustStorePassword: String,
override val dataSourceProperties: Properties,
override val database: Properties?,
override val certificateSigningService: URL,
override val compatibilityZoneURL: URL? = null,
override val rpcUsers: List<User>,
override val verifierType: VerifierType,
// TODO typesafe config supports the notion of durations. Make use of that by mapping it to java.time.Duration.

View File

@ -1,70 +1,152 @@
package net.corda.node.services.network
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.internal.openHttpConnection
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory
import okhttp3.CacheControl
import okhttp3.Headers
import rx.Subscription
import java.io.BufferedReader
import java.io.Closeable
import java.net.HttpURLConnection
import java.net.URL
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
interface NetworkMapClient {
/**
* Publish node info to network map service.
*/
fun publish(signedNodeInfo: SignedData<NodeInfo>)
class NetworkMapClient(compatibilityZoneURL: URL) {
companion object {
val logger = loggerFor<NetworkMapClient>()
}
/**
* Retrieve [NetworkMap] from the network map service containing list of node info hashes and network parameter hash.
*/
// TODO: Use NetworkMap object when available.
fun getNetworkMap(): List<SecureHash>
private val networkMapUrl = URL("$compatibilityZoneURL/network-map")
/**
* Retrieve [NodeInfo] from network map service using the node info hash.
*/
fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo?
// TODO: Implement getNetworkParameter when its available.
//fun getNetworkParameter(networkParameterHash: SecureHash): NetworkParameter
}
class HTTPNetworkMapClient(private val networkMapUrl: String) : NetworkMapClient {
override fun publish(signedNodeInfo: SignedData<NodeInfo>) {
fun publish(signedNodeInfo: SignedData<NodeInfo>) {
val publishURL = URL("$networkMapUrl/publish")
val conn = publishURL.openConnection() as HttpURLConnection
val conn = publishURL.openHttpConnection()
conn.doOutput = true
conn.requestMethod = "POST"
conn.setRequestProperty("Content-Type", "application/octet-stream")
conn.outputStream.write(signedNodeInfo.serialize().bytes)
when (conn.responseCode) {
HttpURLConnection.HTTP_OK -> return
HttpURLConnection.HTTP_UNAUTHORIZED -> throw IllegalArgumentException(conn.errorStream.bufferedReader().readLine())
else -> throw IllegalArgumentException("Unexpected response code ${conn.responseCode}, response error message: '${conn.errorStream.bufferedReader().readLines()}'")
conn.outputStream.use { it.write(signedNodeInfo.serialize().bytes) }
// This will throw IOException if the response code is not HTTP 200.
// This gives a much better exception then reading the error stream.
conn.inputStream.close()
}
fun getNetworkMap(): NetworkMapResponse {
val conn = networkMapUrl.openHttpConnection()
val response = conn.inputStream.bufferedReader().use(BufferedReader::readLine)
val networkMap = ObjectMapper().readValue(response, List::class.java).map { SecureHash.parse(it.toString()) }
val timeout = CacheControl.parse(Headers.of(conn.headerFields.filterKeys { it != null }.mapValues { it.value.first() })).maxAgeSeconds().seconds
return NetworkMapResponse(networkMap, timeout)
}
fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo? {
val conn = URL("$networkMapUrl/$nodeInfoHash").openHttpConnection()
return if (conn.responseCode == HttpURLConnection.HTTP_NOT_FOUND) {
null
} else {
conn.inputStream.use { it.readBytes() }.deserialize()
}
}
override fun getNetworkMap(): List<SecureHash> {
val conn = URL(networkMapUrl).openConnection() as HttpURLConnection
fun myPublicHostname(): String {
val conn = URL("$networkMapUrl/my-hostname").openHttpConnection()
return conn.inputStream.bufferedReader().use(BufferedReader::readLine)
}
}
return when (conn.responseCode) {
HttpURLConnection.HTTP_OK -> {
val response = conn.inputStream.bufferedReader().use { it.readLine() }
ObjectMapper().readValue(response, List::class.java).map { SecureHash.parse(it.toString()) }
data class NetworkMapResponse(val networkMap: List<SecureHash>, val cacheMaxAge: Duration)
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
private val networkMapClient: NetworkMapClient?) : Closeable {
companion object {
private val logger = loggerFor<NetworkMapUpdater>()
private val retryInterval = 1.minutes
}
private val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
private var fileWatcherSubscription: Subscription? = null
override fun close() {
fileWatcherSubscription?.unsubscribe()
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
}
fun updateNodeInfo(newInfo: NodeInfo, signNodeInfo: (NodeInfo) -> SignedData<NodeInfo>) {
val oldInfo = networkMapCache.getNodeByLegalIdentity(newInfo.legalIdentities.first())
// Compare node info without timestamp.
if (newInfo.copy(serial = 0L) == oldInfo?.copy(serial = 0L)) return
// Only publish and write to disk if there are changes to the node info.
val signedNodeInfo = signNodeInfo(newInfo)
fileWatcher.saveToFile(signedNodeInfo)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(signedNodeInfo, networkMapClient)
}
}
fun subscribeToNetworkMap() {
require(fileWatcherSubscription == null) { "Should not call this method twice." }
// Subscribe to file based networkMap
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe(networkMapCache::addNode)
if (networkMapClient == null) return
// Subscribe to remote network map if configured.
val task = object : Runnable {
override fun run() {
val nextScheduleDelay = try {
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
val currentNodeHashes = networkMapCache.allNodeHashes
(networkMap - currentNodeHashes).mapNotNull {
// Download new node info from network map
networkMapClient.getNodeInfo(it)
}.forEach {
// Add new node info to the network map cache, these could be new node info or modification of node info for existing nodes.
networkMapCache.addNode(it)
}
// Remove node info from network map.
(currentNodeHashes - networkMap - fileWatcher.processedNodeInfoHashes)
.mapNotNull(networkMapCache::getNodeByHash)
.forEach(networkMapCache::removeNode)
cacheTimeout
} catch (t: Throwable) {
logger.warn("Error encountered while updating network map, will retry in $retryInterval", t)
retryInterval
}
// Schedule the next update.
executor.schedule(this, nextScheduleDelay.toMillis(), TimeUnit.MILLISECONDS)
}
else -> throw IllegalArgumentException("Unexpected response code ${conn.responseCode}, response error message: '${conn.errorStream.bufferedReader().readLines()}'")
}
executor.submit(task) // The check may be expensive, so always run it in the background even the first time.
}
override fun getNodeInfo(nodeInfoHash: SecureHash): NodeInfo? {
val nodeInfoURL = URL("$networkMapUrl/$nodeInfoHash")
val conn = nodeInfoURL.openConnection() as HttpURLConnection
return when (conn.responseCode) {
HttpURLConnection.HTTP_OK -> conn.inputStream.readBytes().deserialize()
HttpURLConnection.HTTP_NOT_FOUND -> null
else -> throw IllegalArgumentException("Unexpected response code ${conn.responseCode}, response error message: '${conn.errorStream.bufferedReader().readLines()}'")
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedData<NodeInfo>, networkMapClient: NetworkMapClient) {
val task = object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
logger.warn("Error encountered while publishing node info, will retry in $retryInterval.", t)
// TODO: Exponential backoff?
executor.schedule(this, retryInterval.toMillis(), TimeUnit.MILLISECONDS)
}
}
}
executor.submit(task)
}
}

View File

@ -1,8 +1,8 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.sign
import net.corda.core.internal.*
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
@ -15,7 +15,7 @@ import rx.Scheduler
import rx.schedulers.Schedulers
import java.io.IOException
import java.nio.file.Path
import java.security.KeyPair
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.streams.toList
@ -24,19 +24,20 @@ import kotlin.streams.toList
* - Serialize and de-serialize a [NodeInfo] to disk and reading it back.
* - Poll a directory for new serialized [NodeInfo]
*
* @param path the base path of a node.
* @param pollFrequencyMsec how often to poll the filesystem in milliseconds. Any value smaller than 5 seconds will
* be treated as 5 seconds.
* @param nodePath the base path of a node.
* @param pollInterval how often to poll the filesystem in milliseconds. Must be longer then 5 seconds.
* @param scheduler a [Scheduler] for the rx [Observable] returned by [nodeInfoUpdates], this is mainly useful for
* testing. It defaults to the io scheduler which is the appropriate value for production uses.
*/
// TODO: Use NIO watch service instead?
class NodeInfoWatcher(private val nodePath: Path,
pollFrequencyMsec: Long = 5.seconds.toMillis(),
private val pollInterval: Duration = 5.seconds,
private val scheduler: Scheduler = Schedulers.io()) {
private val nodeInfoDirectory = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val pollFrequencyMsec: Long = maxOf(pollFrequencyMsec, 5.seconds.toMillis())
private val successfullyProcessedFiles = mutableSetOf<Path>()
private val processedNodeInfoFiles = mutableSetOf<Path>()
private val _processedNodeInfoHashes = mutableSetOf<SecureHash>()
val processedNodeInfoHashes: Set<SecureHash> get() = _processedNodeInfoHashes.toSet()
companion object {
private val logger = loggerFor<NodeInfoWatcher>()
@ -48,17 +49,14 @@ class NodeInfoWatcher(private val nodePath: Path,
* is used so that one can freely copy these files without fearing to overwrite another one.
*
* @param path the path where to write the file, if non-existent it will be created.
* @param nodeInfo the NodeInfo to serialize.
* @param signingKey used to sign the NodeInfo data.
* @param signedNodeInfo the signed NodeInfo.
*/
fun saveToFile(path: Path, nodeInfo: NodeInfo, signingKey: KeyPair) {
fun saveToFile(path: Path, signedNodeInfo: SignedData<NodeInfo>) {
try {
path.createDirectories()
val serializedBytes = nodeInfo.serialize()
val regSig = signingKey.sign(serializedBytes.bytes)
val signedData = SignedData(serializedBytes, regSig)
signedData.serialize().open().copyTo(
path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${serializedBytes.hash}")
signedNodeInfo.serialize()
.open()
.copyTo(path / "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${signedNodeInfo.raw.hash}")
} catch (e: Exception) {
logger.warn("Couldn't write node info to file", e)
}
@ -66,6 +64,7 @@ class NodeInfoWatcher(private val nodePath: Path,
}
init {
require(pollInterval >= 5.seconds) { "Poll interval must be 5 seconds or longer." }
if (!nodeInfoDirectory.isDirectory()) {
try {
nodeInfoDirectory.createDirectories()
@ -85,10 +84,12 @@ class NodeInfoWatcher(private val nodePath: Path,
* @return an [Observable] returning [NodeInfo]s, at most one [NodeInfo] is returned for each processed file.
*/
fun nodeInfoUpdates(): Observable<NodeInfo> {
return Observable.interval(pollFrequencyMsec, TimeUnit.MILLISECONDS, scheduler)
return Observable.interval(pollInterval.toMillis(), TimeUnit.MILLISECONDS, scheduler)
.flatMapIterable { loadFromDirectory() }
}
fun saveToFile(signedNodeInfo: SignedData<NodeInfo>) = Companion.saveToFile(nodePath, signedNodeInfo)
/**
* Loads all the files contained in a given path and returns the deserialized [NodeInfo]s.
* Signatures are checked before returning a value.
@ -100,10 +101,13 @@ class NodeInfoWatcher(private val nodePath: Path,
return emptyList()
}
val result = nodeInfoDirectory.list { paths ->
paths.filter { it !in successfullyProcessedFiles }
paths.filter { it !in processedNodeInfoFiles }
.filter { it.isRegularFile() }
.map { path ->
processFile(path)?.apply { successfullyProcessedFiles.add(path) }
processFile(path)?.apply {
processedNodeInfoFiles.add(path)
_processedNodeInfoHashes.add(this.serialize().hash)
}
}
.toList()
.filterNotNull()
@ -115,13 +119,13 @@ class NodeInfoWatcher(private val nodePath: Path,
}
private fun processFile(file: Path): NodeInfo? {
try {
return try {
logger.info("Reading NodeInfo from file: $file")
val signedData = file.readAll().deserialize<SignedData<NodeInfo>>()
return signedData.verified()
signedData.verified()
} catch (e: Exception) {
logger.warn("Exception parsing NodeInfo from file. $file", e)
return null
null
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.network
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
@ -16,11 +17,11 @@ import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
@ -63,7 +64,6 @@ class NetworkMapCacheImpl(
@ThreadSafe
open class PersistentNetworkMapCache(
private val database: CordaPersistence,
val configuration: NodeConfiguration,
notaries: List<NotaryInfo>
) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
companion object {
@ -88,17 +88,33 @@ open class PersistentNetworkMapCache(
override val notaryIdentities: List<Party> = notaries.map { it.identity }
private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null }
private val nodeInfoSerializer = NodeInfoWatcher(configuration.baseDirectory,
configuration.additionalNodeInfoPollingFrequencyMsec)
init {
loadFromFiles()
database.transaction { loadFromDB(session) }
}
private fun loadFromFiles() {
logger.info("Loading network map from files..")
nodeInfoSerializer.nodeInfoUpdates().subscribe { node -> addNode(node) }
override val allNodeHashes: List<SecureHash>
get() {
return database.transaction {
val builder = session.criteriaBuilder
val query = builder.createQuery(String::class.java).run {
from(NodeInfoSchemaV1.PersistentNodeInfo::class.java).run {
select(get<String>(NodeInfoSchemaV1.PersistentNodeInfo::hash.name))
}
}
session.createQuery(query).resultList.map { SecureHash.sha256(it) }
}
}
override fun getNodeByHash(nodeHash: SecureHash): NodeInfo? {
return database.transaction {
val builder = session.criteriaBuilder
val query = builder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java).run {
from(NodeInfoSchemaV1.PersistentNodeInfo::class.java).run {
where(builder.equal(get<String>(NodeInfoSchemaV1.PersistentNodeInfo::hash.name), nodeHash.toString()))
}
}
session.createQuery(query).resultList.singleOrNull()?.toNodeInfo()
}
}
override fun isValidatingNotary(party: Party): Boolean = party in validatingNotaries
@ -277,10 +293,12 @@ open class PersistentNetworkMapCache(
else result.map { it.toNodeInfo() }.singleOrNull() ?: throw IllegalStateException("More than one node with the same host and port")
}
/** Object Relational Mapping support. */
private fun generateMappedObject(nodeInfo: NodeInfo): NodeInfoSchemaV1.PersistentNodeInfo {
return NodeInfoSchemaV1.PersistentNodeInfo(
id = 0,
hash = nodeInfo.serialize().hash.toString(),
addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) },
legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.mapIndexed { idx, elem ->
NodeInfoSchemaV1.DBPartyAndCertificate(elem, isMain = idx == 0)

View File

@ -1,6 +1,7 @@
package net.corda.node.utilities.registration
import com.google.common.net.MediaType
import net.corda.core.internal.openHttpConnection
import net.corda.node.utilities.CertificateStream
import org.apache.commons.io.IOUtils
import org.bouncycastle.pkcs.PKCS10CertificationRequest
@ -12,7 +13,9 @@ import java.security.cert.Certificate
import java.util.*
import java.util.zip.ZipInputStream
class HTTPNetworkRegistrationService(val server: URL) : NetworkRegistrationService {
class HTTPNetworkRegistrationService(compatibilityZoneURL: URL) : NetworkRegistrationService {
private val registrationURL = URL("$compatibilityZoneURL/certificate")
companion object {
// TODO: Propagate version information from gradle
val clientVersion = "1.0"
@ -21,7 +24,7 @@ class HTTPNetworkRegistrationService(val server: URL) : NetworkRegistrationServi
@Throws(CertificateRequestException::class)
override fun retrieveCertificates(requestId: String): Array<Certificate>? {
// Poll server to download the signed certificate once request has been approved.
val url = URL("$server/api/certificate/$requestId")
val url = URL("$registrationURL/$requestId")
val conn = url.openConnection() as HttpURLConnection
conn.requestMethod = "GET"
@ -43,7 +46,7 @@ class HTTPNetworkRegistrationService(val server: URL) : NetworkRegistrationServi
override fun submitRequest(request: PKCS10CertificationRequest): String {
// Post request to certificate signing server via http.
val conn = URL("$server/api/certificate").openConnection() as HttpURLConnection
val conn = URL("$registrationURL").openHttpConnection()
conn.doOutput = true
conn.requestMethod = "POST"
conn.setRequestProperty("Content-Type", "application/octet-stream")

View File

@ -14,7 +14,6 @@ database = {
initDatabase = true
}
devMode = true
certificateSigningService = "https://cordaci-netperm.corda.r3cev.com"
useHTTPS = false
h2port = 0
useTestClock = false

View File

@ -41,7 +41,6 @@ class NodeConfigurationImplTest {
trustStorePassword = "trustpass",
dataSourceProperties = makeTestDataSourceProperties(ALICE.name.organisation),
database = makeTestDatabaseProperties(),
certificateSigningService = URL("http://localhost"),
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
useHTTPS = false,

View File

@ -106,7 +106,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
doReturn(configuration).whenever(it).configuration
doReturn(MonitoringService(MetricRegistry())).whenever(it).monitoringService
doReturn(validatedTransactions).whenever(it).validatedTransactions
doReturn(NetworkMapCacheImpl(MockNetworkMapCache(database, configuration), identityService)).whenever(it).networkMapCache
doReturn(NetworkMapCacheImpl(MockNetworkMapCache(database), identityService)).whenever(it).networkMapCache
doCallRealMethod().whenever(it).signInitialTransaction(any(), any<PublicKey>())
doReturn(myInfo).whenever(it).myInfo
doReturn(kms).whenever(it).keyManagementService

View File

@ -1,6 +1,7 @@
package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import com.nhaarman.mockito_kotlin.mock
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.concurrent.doneFuture
@ -13,6 +14,7 @@ import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.NetworkMapClient
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
@ -30,6 +32,7 @@ import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.mockito.Mockito.mock
import java.net.ServerSocket
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
@ -79,7 +82,7 @@ class ArtemisMessagingTests {
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), ::makeTestIdentityService)
networkMapRegistrationFuture = doneFuture(Unit)
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, config, emptyList()), rigorousMock())
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock())
}
@After

View File

@ -8,6 +8,8 @@ import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.services.network.TestNodeInfoFactory.createNodeInfo
import net.corda.node.utilities.CertificateType
import net.corda.node.utilities.X509Utilities
import net.corda.testing.SerializationEnvironmentRule
@ -28,6 +30,7 @@ import org.junit.Test
import java.io.ByteArrayInputStream
import java.io.InputStream
import java.net.InetSocketAddress
import java.net.URL
import java.security.cert.CertPath
import java.security.cert.Certificate
import java.security.cert.CertificateFactory
@ -38,7 +41,7 @@ import javax.ws.rs.core.Response
import javax.ws.rs.core.Response.ok
import kotlin.test.assertEquals
class HTTPNetworkMapClientTest {
class NetworkMapClientTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
@ -61,7 +64,7 @@ class HTTPNetworkMapClientTest {
register(MockNetworkMapServer())
}
val jerseyServlet = ServletHolder(ServletContainer(resourceConfig)).apply { initOrder = 0 }// Initialise at server start
addServlet(jerseyServlet, "/api/*")
addServlet(jerseyServlet, "/*")
})
}
}
@ -72,7 +75,7 @@ class HTTPNetworkMapClientTest {
}
val hostAndPort = server.connectors.mapNotNull { it as? ServerConnector }.first()
networkMapClient = HTTPNetworkMapClient("http://${hostAndPort.host}:${hostAndPort.localPort}/api/network-map")
networkMapClient = NetworkMapClient(URL("http://${hostAndPort.host}:${hostAndPort.localPort}"))
}
@After
@ -90,7 +93,7 @@ class HTTPNetworkMapClientTest {
val nodeInfoHash = nodeInfo.serialize().sha256()
assertThat(networkMapClient.getNetworkMap()).containsExactly(nodeInfoHash)
assertThat(networkMapClient.getNetworkMap().networkMap).containsExactly(nodeInfoHash)
assertEquals(nodeInfo, networkMapClient.getNodeInfo(nodeInfoHash))
val signedNodeInfo2 = createNodeInfo("Test2")
@ -98,27 +101,21 @@ class HTTPNetworkMapClientTest {
networkMapClient.publish(signedNodeInfo2)
val nodeInfoHash2 = nodeInfo2.serialize().sha256()
assertThat(networkMapClient.getNetworkMap()).containsExactly(nodeInfoHash, nodeInfoHash2)
assertThat(networkMapClient.getNetworkMap().networkMap).containsExactly(nodeInfoHash, nodeInfoHash2)
assertEquals(100000.seconds, networkMapClient.getNetworkMap().cacheMaxAge)
assertEquals(nodeInfo2, networkMapClient.getNodeInfo(nodeInfoHash2))
}
private fun createNodeInfo(organisation: String): SignedData<NodeInfo> {
val keyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val clientCert = X509Utilities.createCertificate(CertificateType.CLIENT_CA, intermediateCACert, intermediateCAKey, CordaX500Name(organisation = organisation, locality = "London", country = "GB"), keyPair.public)
val certPath = buildCertPath(clientCert.toX509Certificate(), intermediateCACert.toX509Certificate(), rootCACert.toX509Certificate())
val nodeInfo = NodeInfo(listOf(NetworkHostAndPort("my.$organisation.com", 1234)), listOf(PartyAndCertificate(certPath)), 1, serial = 1L)
// Create digital signature.
val digitalSignature = DigitalSignature.WithKey(keyPair.public, Crypto.doSign(keyPair.private, nodeInfo.serialize().bytes))
return SignedData(nodeInfo.serialize(), digitalSignature)
@Test
fun `get hostname string from http response correctly`() {
assertEquals("test.host.name", networkMapClient.myPublicHostname())
}
}
@Path("network-map")
// This is a stub implementation of the network map rest API.
internal class MockNetworkMapServer {
private val nodeInfos = mutableMapOf<SecureHash, NodeInfo>()
val nodeInfoMap = mutableMapOf<SecureHash, NodeInfo>()
@POST
@Path("publish")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@ -126,33 +123,31 @@ internal class MockNetworkMapServer {
val registrationData = input.readBytes().deserialize<SignedData<NodeInfo>>()
val nodeInfo = registrationData.verified()
val nodeInfoHash = nodeInfo.serialize().sha256()
nodeInfos.put(nodeInfoHash, nodeInfo)
nodeInfoMap.put(nodeInfoHash, nodeInfo)
return ok().build()
}
@GET
@Produces(MediaType.APPLICATION_JSON)
fun getNetworkMap(): Response {
return Response.ok(ObjectMapper().writeValueAsString(nodeInfos.keys.map { it.toString() })).build()
return Response.ok(ObjectMapper().writeValueAsString(nodeInfoMap.keys.map { it.toString() })).header("Cache-Control", "max-age=100000").build()
}
@GET
@Path("{var}")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
fun getNodeInfo(@PathParam("var") nodeInfoHash: String): Response {
val nodeInfo = nodeInfos[SecureHash.parse(nodeInfoHash)]
val nodeInfo = nodeInfoMap[SecureHash.parse(nodeInfoHash)]
return if (nodeInfo != null) {
Response.ok(nodeInfo.serialize().bytes)
} else {
Response.status(Response.Status.NOT_FOUND)
}.build()
}
}
private fun buildCertPath(vararg certificates: Certificate): CertPath {
return CertificateFactory.getInstance("X509").generateCertPath(certificates.asList())
@GET
@Path("my-hostname")
fun getHostName(): Response {
return Response.ok("test.host.name").build()
}
}
private fun X509CertificateHolder.toX509Certificate(): X509Certificate {
return CertificateFactory.getInstance("X509").generateCertificate(ByteArrayInputStream(encoded)) as X509Certificate
}

View File

@ -0,0 +1,232 @@
package net.corda.node.services.network
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.times
import com.nhaarman.mockito_kotlin.verify
import net.corda.cordform.CordformNode
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.identity.Party
import net.corda.core.internal.div
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.millis
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.testing.SerializationEnvironmentRule
import org.junit.Rule
import org.junit.Test
import rx.schedulers.TestScheduler
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class NetworkMapUpdaterTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val jimFs = Jimfs.newFileSystem(Configuration.unix())
private val baseDir = jimFs.getPath("/node")
@Test
fun `publish node info`() {
val keyPair = Crypto.generateKeyPair()
val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1").verified()
val signedNodeInfo = TestNodeInfoFactory.sign(keyPair, nodeInfo1)
val sameNodeInfoDifferentTime = nodeInfo1.copy(serial = System.currentTimeMillis())
val signedSameNodeInfoDifferentTime = TestNodeInfoFactory.sign(keyPair, sameNodeInfoDifferentTime)
val differentNodeInfo = nodeInfo1.copy(addresses = listOf(NetworkHostAndPort("my.new.host.com", 1000)))
val signedDifferentNodeInfo = TestNodeInfoFactory.sign(keyPair, differentNodeInfo)
val networkMapCache = getMockNetworkMapCache()
val networkMapClient = mock<NetworkMapClient>()
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
// Publish node info for the first time.
updater.updateNodeInfo(nodeInfo1) { signedNodeInfo }
// Sleep as publish is asynchronous.
// TODO: Remove sleep in unit test
Thread.sleep(200)
verify(networkMapClient, times(1)).publish(any())
networkMapCache.addNode(nodeInfo1)
// Publish the same node info, but with different serial.
updater.updateNodeInfo(sameNodeInfoDifferentTime) { signedSameNodeInfoDifferentTime }
// TODO: Remove sleep in unit test.
Thread.sleep(200)
// Same node info should not publish twice
verify(networkMapClient, times(0)).publish(signedSameNodeInfoDifferentTime)
// Publish different node info.
updater.updateNodeInfo(differentNodeInfo) { signedDifferentNodeInfo }
// TODO: Remove sleep in unit test.
Thread.sleep(200)
verify(networkMapClient, times(1)).publish(signedDifferentNodeInfo)
updater.close()
}
@Test
fun `process add node updates from network map, with additional node infos from dir`() {
val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1")
val nodeInfo2 = TestNodeInfoFactory.createNodeInfo("Info 2")
val nodeInfo3 = TestNodeInfoFactory.createNodeInfo("Info 3")
val nodeInfo4 = TestNodeInfoFactory.createNodeInfo("Info 4")
val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file")
val networkMapCache = getMockNetworkMapCache()
val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedData<NodeInfo>>()
val networkMapClient = mock<NetworkMapClient> {
on { publish(any()) }.then {
val signedNodeInfo: SignedData<NodeInfo> = uncheckedCast(it.arguments.first())
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
}
on { getNetworkMap() }.then { NetworkMapResponse(nodeInfoMap.keys.toList(), 100.millis) }
on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() }
}
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
// Test adding new node.
networkMapClient.publish(nodeInfo1)
// Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
updater.subscribeToNetworkMap()
networkMapClient.publish(nodeInfo2)
// TODO: Remove sleep in unit test.
Thread.sleep(200)
verify(networkMapCache, times(2)).addNode(any())
verify(networkMapCache, times(1)).addNode(nodeInfo1.verified())
verify(networkMapCache, times(1)).addNode(nodeInfo2.verified())
NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo)
networkMapClient.publish(nodeInfo3)
networkMapClient.publish(nodeInfo4)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
// TODO: Remove sleep in unit test.
Thread.sleep(200)
// 4 node info from network map, and 1 from file.
verify(networkMapCache, times(5)).addNode(any())
verify(networkMapCache, times(1)).addNode(nodeInfo3.verified())
verify(networkMapCache, times(1)).addNode(nodeInfo4.verified())
verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified())
updater.close()
}
@Test
fun `process remove node updates from network map, with additional node infos from dir`() {
val nodeInfo1 = TestNodeInfoFactory.createNodeInfo("Info 1")
val nodeInfo2 = TestNodeInfoFactory.createNodeInfo("Info 2")
val nodeInfo3 = TestNodeInfoFactory.createNodeInfo("Info 3")
val nodeInfo4 = TestNodeInfoFactory.createNodeInfo("Info 4")
val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file")
val networkMapCache = getMockNetworkMapCache()
val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedData<NodeInfo>>()
val networkMapClient = mock<NetworkMapClient> {
on { publish(any()) }.then {
val signedNodeInfo: SignedData<NodeInfo> = uncheckedCast(it.arguments.first())
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
}
on { getNetworkMap() }.then { NetworkMapResponse(nodeInfoMap.keys.toList(), 100.millis) }
on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() }
}
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
// Add all nodes.
NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo)
networkMapClient.publish(nodeInfo1)
networkMapClient.publish(nodeInfo2)
networkMapClient.publish(nodeInfo3)
networkMapClient.publish(nodeInfo4)
updater.subscribeToNetworkMap()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
// TODO: Remove sleep in unit test.
Thread.sleep(200)
// 4 node info from network map, and 1 from file.
assertEquals(4, nodeInfoMap.size)
verify(networkMapCache, times(5)).addNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified())
// Test remove node.
nodeInfoMap.clear()
// TODO: Remove sleep in unit test.
Thread.sleep(200)
verify(networkMapCache, times(4)).removeNode(any())
verify(networkMapCache, times(1)).removeNode(nodeInfo1.verified())
verify(networkMapCache, times(1)).removeNode(nodeInfo2.verified())
verify(networkMapCache, times(1)).removeNode(nodeInfo3.verified())
verify(networkMapCache, times(1)).removeNode(nodeInfo4.verified())
// Node info from file should not be deleted
assertEquals(1, networkMapCache.allNodeHashes.size)
assertEquals(fileNodeInfo.verified().serialize().hash, networkMapCache.allNodeHashes.first())
updater.close()
}
@Test
fun `receive node infos from directory, without a network map`() {
val fileNodeInfo = TestNodeInfoFactory.createNodeInfo("Info from file")
val networkMapCache = getMockNetworkMapCache()
val scheduler = TestScheduler()
val fileWatcher = NodeInfoWatcher(baseDir, scheduler = scheduler)
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null)
// Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
updater.subscribeToNetworkMap()
NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(1)).addNode(any())
verify(networkMapCache, times(1)).addNode(fileNodeInfo.verified())
assertEquals(1, networkMapCache.allNodeHashes.size)
assertEquals(fileNodeInfo.verified().serialize().hash, networkMapCache.allNodeHashes.first())
updater.close()
}
private fun getMockNetworkMapCache() = mock<NetworkMapCacheInternal> {
val data = ConcurrentHashMap<Party, NodeInfo>()
on { addNode(any()) }.then {
val nodeInfo = it.arguments.first() as NodeInfo
data.put(nodeInfo.legalIdentities.first(), nodeInfo)
}
on { removeNode(any()) }.then { data.remove((it.arguments.first() as NodeInfo).legalIdentities.first()) }
on { getNodeByLegalIdentity(any()) }.then { data[it.arguments.first()] }
on { allNodeHashes }.then { data.values.map { it.serialize().hash } }
on { getNodeByHash(any()) }.then { mock -> data.values.single { it.serialize().hash == mock.arguments.first() } }
}
}

View File

@ -0,0 +1,50 @@
package net.corda.node.services.network
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SignedData
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.utilities.CertificateType
import net.corda.node.utilities.X509Utilities
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.cert.X509CertificateHolder
import java.io.ByteArrayInputStream
import java.security.KeyPair
import java.security.cert.CertPath
import java.security.cert.Certificate
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
object TestNodeInfoFactory {
private val rootCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
private val rootCACert = X509Utilities.createSelfSignedCACertificate(CordaX500Name(commonName = "Corda Node Root CA", organisation = "R3 LTD", locality = "London", country = "GB"), rootCAKey)
private val intermediateCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
private val intermediateCACert = X509Utilities.createCertificate(CertificateType.INTERMEDIATE_CA, rootCACert, rootCAKey, X500Name("CN=Corda Node Intermediate CA,L=London"), intermediateCAKey.public)
fun createNodeInfo(organisation: String): SignedData<NodeInfo> {
val keyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val clientCert = X509Utilities.createCertificate(CertificateType.CLIENT_CA, intermediateCACert, intermediateCAKey, CordaX500Name(organisation = organisation, locality = "London", country = "GB"), keyPair.public)
val certPath = buildCertPath(clientCert.toX509Certificate(), intermediateCACert.toX509Certificate(), rootCACert.toX509Certificate())
val nodeInfo = NodeInfo(listOf(NetworkHostAndPort("my.$organisation.com", 1234)), listOf(PartyAndCertificate(certPath)), 1, serial = 1L)
return sign(keyPair, nodeInfo)
}
fun <T : Any> sign(keyPair: KeyPair, t: T): SignedData<T> {
// Create digital signature.
val digitalSignature = DigitalSignature.WithKey(keyPair.public, Crypto.doSign(keyPair.private, t.serialize().bytes))
return SignedData(t.serialize(), digitalSignature)
}
private fun buildCertPath(vararg certificates: Certificate): CertPath {
return CertificateFactory.getInstance("X509").generateCertPath(certificates.asList())
}
private fun X509CertificateHolder.toX509Certificate(): X509Certificate {
return CertificateFactory.getInstance("X509").generateCertificate(ByteArrayInputStream(encoded)) as X509Certificate
}
}

View File

@ -0,0 +1 @@
mock-maker-inline