mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
Wire part of network parameters (#2187)
* Take maximum message size from network parameters * Add epoch handling * Add handling of network parameters mismatch Change NetworkMapClient and updater, add handle in AbstractNode that results in node shutdown on parameters mismatch. Later on we should implement proper handling of parameters updates. Add tests of NetworkParameters wiring. When node starts with compatibilityZone url configured it takes networkParameters from the networkMap. * Permit only one network parameters file On node startup network parameters are read from node's base directory, we permit only zero or one files to be there. If network map server is configured the parameters can be downloaded at startup (if not present in the directory already). * Update docs on network map endpoints
This commit is contained in:
committed by
GitHub
parent
90f6cd1fe7
commit
550469ea38
@ -1,9 +1,16 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.internal.readAll
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.NetworkParameters
|
||||
import net.corda.testing.node.internal.CompatibilityZoneParams
|
||||
import net.corda.testing.ALICE_NAME
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.BOB_NAME
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
@ -12,10 +19,17 @@ import net.corda.testing.node.network.NetworkMapServer
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.net.URL
|
||||
import java.nio.file.Files
|
||||
import kotlin.streams.toList
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NetworkMapTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
private val cacheTimeout = 1.seconds
|
||||
private val portAllocation = PortAllocation.Incremental(10000)
|
||||
|
||||
@ -34,9 +48,20 @@ class NetworkMapTest {
|
||||
networkMapServer.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `node correctly downloads and saves network parameters file on startup`() {
|
||||
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone, initialiseSerialization = false) {
|
||||
val aliceDir = baseDirectory(ALICE_NAME)
|
||||
startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val networkParameters = Files.list(aliceDir).toList().single { NETWORK_PARAMS_FILE_NAME == it.fileName.toString() }
|
||||
.readAll().deserialize<SignedData<NetworkParameters>>().verified()
|
||||
assertEquals(NetworkMapServer.stubNetworkParameter, networkParameters)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `nodes can see each other using the http network map`() {
|
||||
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone) {
|
||||
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone, initialiseSerialization = false) {
|
||||
val alice = startNode(providedName = ALICE_NAME)
|
||||
val bob = startNode(providedName = BOB_NAME)
|
||||
val notaryNode = defaultNotaryNode.get()
|
||||
@ -51,7 +76,7 @@ class NetworkMapTest {
|
||||
|
||||
@Test
|
||||
fun `nodes process network map add updates correctly when adding new node to network map`() {
|
||||
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone) {
|
||||
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone, initialiseSerialization = false) {
|
||||
val alice = startNode(providedName = ALICE_NAME)
|
||||
val notaryNode = defaultNotaryNode.get()
|
||||
val aliceNode = alice.get()
|
||||
@ -72,7 +97,7 @@ class NetworkMapTest {
|
||||
|
||||
@Test
|
||||
fun `nodes process network map remove updates correctly`() {
|
||||
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone) {
|
||||
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone, initialiseSerialization = false) {
|
||||
val alice = startNode(providedName = ALICE_NAME)
|
||||
val bob = startNode(providedName = BOB_NAME)
|
||||
val notaryNode = defaultNotaryNode.get()
|
||||
|
@ -14,6 +14,7 @@ import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_CLIENT_CA
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_INTERMEDIATE_CA
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.node.internal.CompatibilityZoneParams
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.node.internal.internalDriver
|
||||
@ -24,6 +25,7 @@ import org.bouncycastle.pkcs.PKCS10CertificationRequest
|
||||
import org.bouncycastle.pkcs.jcajce.JcaPKCS10CertificationRequest
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.InputStream
|
||||
@ -38,6 +40,9 @@ import javax.ws.rs.core.MediaType
|
||||
import javax.ws.rs.core.Response
|
||||
|
||||
class NodeRegistrationTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
private val portAllocation = PortAllocation.Incremental(13000)
|
||||
private val rootCertAndKeyPair = createSelfKeyAndSelfSignedCertificate()
|
||||
private val registrationHandler = RegistrationHandler(rootCertAndKeyPair)
|
||||
@ -47,7 +52,7 @@ class NodeRegistrationTest {
|
||||
|
||||
@Before
|
||||
fun startServer() {
|
||||
server = NetworkMapServer(1.minutes, portAllocation.nextHostAndPort(), registrationHandler)
|
||||
server = NetworkMapServer(1.minutes, portAllocation.nextHostAndPort(), rootCertAndKeyPair, registrationHandler)
|
||||
serverHostAndPort = server.start()
|
||||
}
|
||||
|
||||
@ -64,7 +69,8 @@ class NodeRegistrationTest {
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
notarySpecs = emptyList(),
|
||||
compatibilityZone = compatibilityZone
|
||||
compatibilityZone = compatibilityZone,
|
||||
initialiseSerialization = false
|
||||
) {
|
||||
startNode(providedName = CordaX500Name("Alice", "London", "GB")).getOrThrow()
|
||||
assertThat(registrationHandler.idsPolled).contains("Alice")
|
||||
|
@ -57,6 +57,7 @@ import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSoftLockManager
|
||||
import net.corda.node.shell.InteractiveShell
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.NETWORK_PARAMS_FILE_NAME
|
||||
import net.corda.nodeapi.internal.NetworkParameters
|
||||
import net.corda.nodeapi.internal.crypto.*
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -69,6 +70,7 @@ import rx.Observable
|
||||
import rx.Scheduler
|
||||
import java.io.IOException
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.nio.file.Files
|
||||
import java.security.KeyPair
|
||||
import java.security.KeyStoreException
|
||||
import java.security.PublicKey
|
||||
@ -82,6 +84,7 @@ import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import kotlin.collections.set
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.streams.toList
|
||||
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
|
||||
|
||||
/**
|
||||
@ -135,11 +138,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
protected lateinit var network: MessagingService
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
protected val _nodeReadyFuture = openFuture<Unit>()
|
||||
protected val networkMapClient: NetworkMapClient? by lazy {
|
||||
configuration.compatibilityZoneURL?.let {
|
||||
NetworkMapClient(it, services.identityService.trustRoot)
|
||||
}
|
||||
}
|
||||
protected var networkMapClient: NetworkMapClient? = null
|
||||
|
||||
lateinit var securityManager: RPCSecurityManager get
|
||||
|
||||
@ -197,10 +196,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
check(started == null) { "Node has already been started" }
|
||||
log.info("Node starting up ...")
|
||||
initCertificate()
|
||||
readNetworkParameters()
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
val identityService = makeIdentityService(identity.certificate)
|
||||
networkMapClient = configuration.compatibilityZoneURL?.let {
|
||||
NetworkMapClient(it, identityService.trustRoot)
|
||||
}
|
||||
readNetworkParameters()
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries), identityService)
|
||||
@ -238,10 +240,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
startShell(rpcOps)
|
||||
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
|
||||
}
|
||||
|
||||
val networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
networkMapClient)
|
||||
networkMapClient,
|
||||
networkParameters.serialize().hash)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
networkMapUpdater.updateNodeInfo(services.myInfo) {
|
||||
@ -636,10 +638,28 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
private fun readNetworkParameters() {
|
||||
val file = configuration.baseDirectory / "network-parameters"
|
||||
networkParameters = file.readAll().deserialize<SignedData<NetworkParameters>>().verified()
|
||||
log.info(networkParameters.toString())
|
||||
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node is too old for the network" }
|
||||
val files = Files.list(configuration.baseDirectory).filter { NETWORK_PARAMS_FILE_NAME == it.fileName.toString() }.toList()
|
||||
val paramsFromFile = try {
|
||||
// It's fine at this point if we don't have network parameters or have corrupted file, later we check if parameters can be downloaded from network map server.
|
||||
files[0].readAll().deserialize<SignedData<NetworkParameters>>().verified()
|
||||
} catch (t: Exception) {
|
||||
log.warn("Couldn't find correct network parameters file in the base directory")
|
||||
null
|
||||
}
|
||||
networkParameters = if (paramsFromFile != null) {
|
||||
paramsFromFile
|
||||
} else if (networkMapClient != null) {
|
||||
log.info("Requesting network parameters from network map server...")
|
||||
val (networkMap, _) = networkMapClient!!.getNetworkMap()
|
||||
val signedParams = networkMapClient!!.getNetworkParameter(networkMap.networkParameterHash) ?: throw IllegalArgumentException("Failed loading network parameters from network map server")
|
||||
val verifiedParams = signedParams.verified() // Verify before saving.
|
||||
signedParams.serialize().open().copyTo(configuration.baseDirectory / NETWORK_PARAMS_FILE_NAME)
|
||||
verifiedParams
|
||||
} else {
|
||||
throw IllegalArgumentException("Couldn't load network parameters file")
|
||||
}
|
||||
log.info("Loaded network parameters $networkParameters")
|
||||
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node's platform version is lower than network's required minimumPlatformVersion" }
|
||||
}
|
||||
|
||||
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService {
|
||||
|
@ -144,9 +144,9 @@ open class Node(configuration: NodeConfiguration,
|
||||
val advertisedAddress = info.addresses.single()
|
||||
|
||||
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
|
||||
rpcMessagingClient = RPCMessagingClient(configuration, serverAddress)
|
||||
rpcMessagingClient = RPCMessagingClient(configuration, serverAddress, networkParameters.maxMessageSize)
|
||||
verifierMessagingClient = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics)
|
||||
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, networkParameters.maxMessageSize)
|
||||
VerifierType.InMemory -> null
|
||||
}
|
||||
return P2PMessagingClient(
|
||||
@ -156,12 +156,13 @@ open class Node(configuration: NodeConfiguration,
|
||||
info.legalIdentities[0].owningKey,
|
||||
serverThread,
|
||||
database,
|
||||
advertisedAddress)
|
||||
advertisedAddress,
|
||||
networkParameters.maxMessageSize)
|
||||
}
|
||||
|
||||
private fun makeLocalMessageBroker(): NetworkHostAndPort {
|
||||
with(configuration) {
|
||||
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, rpcAddress?.port, services.networkMapCache, securityManager)
|
||||
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, rpcAddress?.port, services.networkMapCache, securityManager, networkParameters.maxMessageSize)
|
||||
return NetworkHostAndPort("localhost", p2pAddress.port)
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
|
||||
class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort) {
|
||||
class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort, private val maxMessageSize: Int) {
|
||||
companion object {
|
||||
private val log = loggerFor<ArtemisMessagingClient>()
|
||||
}
|
||||
@ -30,7 +30,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration, private val s
|
||||
// would be the default and the two lines below can be deleted.
|
||||
connectionTTL = -1
|
||||
clientFailureCheckPeriod = -1
|
||||
minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
|
||||
minLargeMessageSize = maxMessageSize
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
}
|
||||
val sessionFactory = locator.createSessionFactory()
|
||||
|
@ -101,14 +101,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private val p2pPort: Int,
|
||||
val rpcPort: Int?,
|
||||
val networkMapCache: NetworkMapCache,
|
||||
val securityManager: RPCSecurityManager) : SingletonSerializeAsToken() {
|
||||
val securityManager: RPCSecurityManager,
|
||||
val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
|
||||
@JvmStatic
|
||||
val MAX_FILE_SIZE = 10485760
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
var running = false
|
||||
}
|
||||
@ -181,9 +178,9 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
|
||||
isPersistIDCache = true
|
||||
isPopulateValidatedUser = true
|
||||
journalBufferSize_NIO = MAX_FILE_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
|
||||
journalBufferSize_AIO = MAX_FILE_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
|
||||
journalFileSize = MAX_FILE_SIZE // The size of each journal file in bytes. Artemis default is 10MiB.
|
||||
journalBufferSize_NIO = maxMessageSize // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
|
||||
journalBufferSize_AIO = maxMessageSize // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
|
||||
journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10MiB.
|
||||
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
|
||||
// Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster
|
||||
// user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its
|
||||
@ -211,7 +208,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
)
|
||||
addressesSettings = mapOf(
|
||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
||||
maxSizeBytes = 10L * MAX_FILE_SIZE
|
||||
maxSizeBytes = 10L * maxMessageSize
|
||||
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
|
||||
}
|
||||
)
|
||||
|
@ -68,7 +68,8 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
private val myIdentity: PublicKey,
|
||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
private val database: CordaPersistence,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress,
|
||||
private val maxMessageSize: Int
|
||||
) : SingletonSerializeAsToken(), MessagingService {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
@ -146,7 +147,7 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
|
||||
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
|
||||
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress)
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
|
||||
|
@ -12,8 +12,8 @@ import net.corda.nodeapi.internal.crypto.getX509Certificate
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
|
||||
class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: NetworkHostAndPort) : SingletonSerializeAsToken() {
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress)
|
||||
class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: NetworkHostAndPort, private val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
||||
private var rpcServer: RPCServer? = null
|
||||
|
||||
fun start(rpcOps: RPCOps, securityManager: RPCSecurityManager) = synchronized(this) {
|
||||
|
@ -17,13 +17,13 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import java.util.concurrent.*
|
||||
|
||||
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry) : SingletonSerializeAsToken() {
|
||||
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry, private val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<VerifierMessagingClient>()
|
||||
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
|
||||
}
|
||||
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress)
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
||||
/** An executor for sending messages */
|
||||
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
|
||||
private var verificationResponseConsumer: ClientConsumer? = null
|
||||
|
@ -66,7 +66,7 @@ class NetworkMapClient(compatibilityZoneURL: URL, private val trustedRoot: X509C
|
||||
}
|
||||
}
|
||||
|
||||
fun getNetworkParameter(networkParameterHash: SecureHash): NetworkParameters? {
|
||||
fun getNetworkParameter(networkParameterHash: SecureHash): SignedData<NetworkParameters>? {
|
||||
val conn = URL("$networkMapUrl/network-parameter/$networkParameterHash").openHttpConnection()
|
||||
return if (conn.responseCode == HttpURLConnection.HTTP_NOT_FOUND) {
|
||||
null
|
||||
@ -85,7 +85,8 @@ data class NetworkMapResponse(val networkMap: NetworkMap, val cacheMaxAge: Durat
|
||||
|
||||
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val fileWatcher: NodeInfoWatcher,
|
||||
private val networkMapClient: NetworkMapClient?) : Closeable {
|
||||
private val networkMapClient: NetworkMapClient?,
|
||||
private val currentParametersHash: SecureHash) : Closeable {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
private val retryInterval = 1.minutes
|
||||
@ -125,6 +126,12 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
override fun run() {
|
||||
val nextScheduleDelay = try {
|
||||
val (networkMap, cacheTimeout) = networkMapClient.getNetworkMap()
|
||||
// TODO NetworkParameters updates are not implemented yet. Every mismatch should result in node shutdown.
|
||||
if (currentParametersHash != networkMap.networkParameterHash) {
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Please update node to use correct network parameters file.\"")
|
||||
System.exit(1)
|
||||
}
|
||||
val currentNodeHashes = networkMapCache.allNodeHashes
|
||||
val hashesFromNetworkMap = networkMap.nodeInfoHashes
|
||||
(hashesFromNetworkMap - currentNodeHashes).mapNotNull {
|
||||
@ -144,7 +151,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
(currentNodeHashes - hashesFromNetworkMap - fileWatcher.processedNodeInfoHashes)
|
||||
.mapNotNull(networkMapCache::getNodeByHash)
|
||||
.forEach(networkMapCache::removeNode)
|
||||
// TODO: Check NetworkParameter.
|
||||
cacheTimeout
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Error encountered while updating network map, will retry in ${retryInterval.seconds} seconds", t)
|
||||
|
@ -207,9 +207,6 @@ open class PersistentNetworkMapCache(
|
||||
getAllInfos(session).map { it.toNodeInfo() }
|
||||
}
|
||||
|
||||
// Changes related to NetworkMap redesign
|
||||
// TODO It will be properly merged into network map cache after services removal.
|
||||
|
||||
private fun getAllInfos(session: Session): List<NodeInfoSchemaV1.PersistentNodeInfo> {
|
||||
val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java)
|
||||
criteria.select(criteria.from(NodeInfoSchemaV1.PersistentNodeInfo::class.java))
|
||||
@ -292,7 +289,6 @@ open class PersistentNetworkMapCache(
|
||||
else result.map { it.toNodeInfo() }.singleOrNull() ?: throw IllegalStateException("More than one node with the same host and port")
|
||||
}
|
||||
|
||||
|
||||
/** Object Relational Mapping support. */
|
||||
private fun generateMappedObject(nodeInfo: NodeInfo): NodeInfoSchemaV1.PersistentNodeInfo {
|
||||
return NodeInfoSchemaV1.PersistentNodeInfo(
|
||||
|
@ -0,0 +1,73 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.nodeapi.internal.NetworkParameters
|
||||
import net.corda.nodeapi.internal.NetworkParametersCopier
|
||||
import net.corda.nodeapi.internal.NotaryInfo
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.node.*
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import java.nio.file.Path
|
||||
import kotlin.test.assertFails
|
||||
import org.assertj.core.api.Assertions.*
|
||||
|
||||
class NetworkParametersTest {
|
||||
private val mockNet = MockNetwork(
|
||||
MockNetworkParameters(networkSendManuallyPumped = true),
|
||||
notarySpecs = listOf(MockNetwork.NotarySpec(DUMMY_NOTARY_NAME)))
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
// Minimum Platform Version tests
|
||||
@Test
|
||||
fun `node shutdowns when on lower platform version than network`() {
|
||||
val alice = mockNet.createUnstartedNode(MockNodeParameters(legalName = ALICE_NAME, forcedID = 100, version = MockServices.MOCK_VERSION_INFO.copy(platformVersion = 1)))
|
||||
val aliceDirectory = mockNet.baseDirectory(100)
|
||||
val netParams = testNetworkParameters(
|
||||
notaries = listOf(NotaryInfo(mockNet.defaultNotaryIdentity, true)),
|
||||
minimumPlatformVersion = 2)
|
||||
dropParametersToDir(aliceDirectory, netParams)
|
||||
assertThatThrownBy { alice.start() }.hasMessageContaining("platform version")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `node works fine when on higher platform version`() {
|
||||
val alice = mockNet.createUnstartedNode(MockNodeParameters(legalName = ALICE_NAME, forcedID = 100, version = MockServices.MOCK_VERSION_INFO.copy(platformVersion = 2)))
|
||||
val aliceDirectory = mockNet.baseDirectory(100)
|
||||
val netParams = testNetworkParameters(
|
||||
notaries = listOf(NotaryInfo(mockNet.defaultNotaryIdentity, true)),
|
||||
minimumPlatformVersion = 1)
|
||||
dropParametersToDir(aliceDirectory, netParams)
|
||||
alice.start()
|
||||
}
|
||||
|
||||
// Notaries tests
|
||||
@Test
|
||||
fun `choosing notary not specified in network parameters will fail`() {
|
||||
val fakeNotary = mockNet.createNode(MockNodeParameters(legalName = BOB_NAME, configOverrides = {
|
||||
val notary = NotaryConfig(false)
|
||||
doReturn(notary).whenever(it).notary}))
|
||||
val fakeNotaryId = fakeNotary.info.chooseIdentity()
|
||||
val alice = mockNet.createPartyNode(ALICE_NAME)
|
||||
assertThat(alice.services.networkMapCache.notaryIdentities).doesNotContain(fakeNotaryId)
|
||||
assertFails {
|
||||
alice.services.startFlow(CashIssueFlow(500.DOLLARS, OpaqueBytes.of(0x01), fakeNotaryId)).resultFuture.getOrThrow()
|
||||
}
|
||||
}
|
||||
|
||||
// Helpers
|
||||
private fun dropParametersToDir(dir: Path, params: NetworkParameters) {
|
||||
NetworkParametersCopier(params).install(dir)
|
||||
}
|
||||
}
|
@ -163,7 +163,7 @@ class ArtemisMessagingTests {
|
||||
return Pair(messagingClient, receivedMessages)
|
||||
}
|
||||
|
||||
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort), platformVersion: Int = 1): P2PMessagingClient {
|
||||
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort), platformVersion: Int = 1, maxMessageSize: Int = MAX_MESSAGE_SIZE): P2PMessagingClient {
|
||||
return database.transaction {
|
||||
P2PMessagingClient(
|
||||
config,
|
||||
@ -171,16 +171,16 @@ class ArtemisMessagingTests {
|
||||
server,
|
||||
identity.public,
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
database
|
||||
).apply {
|
||||
database,
|
||||
maxMessageSize = maxMessageSize).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createMessagingServer(local: Int = serverPort, rpc: Int = rpcPort): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, local, rpc, networkMapCache, securityManager).apply {
|
||||
private fun createMessagingServer(local: Int = serverPort, rpc: Int = rpcPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(config, local, rpc, networkMapCache, securityManager, maxMessageSize).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingServer = this
|
||||
}
|
||||
|
@ -6,9 +6,7 @@ import net.corda.core.internal.cert
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.network.TestNodeInfoFactory.createNodeInfo
|
||||
import net.corda.testing.DEV_CA
|
||||
import net.corda.testing.DEV_TRUST_ROOT
|
||||
import net.corda.testing.ROOT_CA
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.node.network.NetworkMapServer
|
||||
@ -71,7 +69,7 @@ class NetworkMapClientTest {
|
||||
@Test
|
||||
fun `download NetworkParameter correctly`() {
|
||||
// The test server returns same network parameter for any hash.
|
||||
val networkParameter = networkMapClient.getNetworkParameter(SecureHash.randomSHA256())
|
||||
val networkParameter = networkMapClient.getNetworkParameter(SecureHash.randomSHA256())?.verified()
|
||||
assertNotNull(networkParameter)
|
||||
assertEquals(NetworkMapServer.stubNetworkParameter, networkParameter)
|
||||
}
|
||||
|
@ -28,6 +28,10 @@ import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NetworkMapUpdaterTest {
|
||||
companion object {
|
||||
val NETWORK_PARAMS_HASH = SecureHash.randomSHA256()
|
||||
}
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
@ -53,7 +57,7 @@ class NetworkMapUpdaterTest {
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, NETWORK_PARAMS_HASH)
|
||||
|
||||
// Publish node info for the first time.
|
||||
updater.updateNodeInfo(nodeInfo1) { signedNodeInfo }
|
||||
@ -96,13 +100,13 @@ class NetworkMapUpdaterTest {
|
||||
val signedNodeInfo: SignedData<NodeInfo> = uncheckedCast(it.arguments.first())
|
||||
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
|
||||
}
|
||||
on { getNetworkMap() }.then { NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), SecureHash.randomSHA256()), 100.millis) }
|
||||
on { getNetworkMap() }.then { NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), NETWORK_PARAMS_HASH), 100.millis) }
|
||||
on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() }
|
||||
}
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, NETWORK_PARAMS_HASH)
|
||||
|
||||
// Test adding new node.
|
||||
networkMapClient.publish(nodeInfo1)
|
||||
@ -150,13 +154,13 @@ class NetworkMapUpdaterTest {
|
||||
val signedNodeInfo: SignedData<NodeInfo> = uncheckedCast(it.arguments.first())
|
||||
nodeInfoMap.put(signedNodeInfo.verified().serialize().hash, signedNodeInfo)
|
||||
}
|
||||
on { getNetworkMap() }.then { NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), SecureHash.randomSHA256()), 100.millis) }
|
||||
on { getNetworkMap() }.then { NetworkMapResponse(NetworkMap(nodeInfoMap.keys.toList(), NETWORK_PARAMS_HASH), 100.millis) }
|
||||
on { getNodeInfo(any()) }.then { nodeInfoMap[it.arguments.first()]?.verified() }
|
||||
}
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, NETWORK_PARAMS_HASH)
|
||||
|
||||
// Add all nodes.
|
||||
NodeInfoWatcher.saveToFile(baseDir / CordformNode.NODE_INFO_DIRECTORY, fileNodeInfo)
|
||||
@ -200,7 +204,7 @@ class NetworkMapUpdaterTest {
|
||||
|
||||
val scheduler = TestScheduler()
|
||||
val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null)
|
||||
val updater = NetworkMapUpdater(networkMapCache, fileWatcher, null, NETWORK_PARAMS_HASH)
|
||||
|
||||
// Not subscribed yet.
|
||||
verify(networkMapCache, times(0)).addNode(any())
|
||||
|
Reference in New Issue
Block a user