mirror of
https://github.com/corda/corda.git
synced 2024-12-26 16:11:12 +00:00
Merge remote-tracking branch 'open/master' into shams-os-merge-0af42bd
# Conflicts: # node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
This commit is contained in:
commit
6283462548
@ -1,12 +1,12 @@
|
||||
Changelog
|
||||
=========
|
||||
|
||||
Here's a summary of what's changed in each Corda release. For guidance on how to upgrade code from the previous
|
||||
release, see :doc:`upgrade-notes`.
|
||||
|
||||
Unreleased
|
||||
----------
|
||||
|
||||
Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code
|
||||
from the previous milestone release.
|
||||
|
||||
* Errors thrown by a Corda node will now reported to a calling RPC client with attention to serialization and obfuscation of internal data.
|
||||
|
||||
* Serializing an inner class (non-static nested class in Java, inner class in Kotlin) will be rejected explicitly by the serialization
|
||||
|
@ -369,9 +369,9 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
||||
payload,
|
||||
link.source.address,
|
||||
remoteLegalName,
|
||||
NetworkHostAndPort(localAddress.hostString, localAddress.port),
|
||||
localLegalName,
|
||||
NetworkHostAndPort(remoteAddress.hostString, remoteAddress.port),
|
||||
localLegalName,
|
||||
NetworkHostAndPort(localAddress.hostString, localAddress.port),
|
||||
appProperties,
|
||||
channel,
|
||||
delivery)
|
||||
|
@ -149,10 +149,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
is SendableMessageImpl -> {
|
||||
val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port)
|
||||
require(inetAddress == remoteAddress) {
|
||||
"Message for incorrect endpoint"
|
||||
"Message for incorrect endpoint $inetAddress expected $remoteAddress"
|
||||
}
|
||||
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
|
||||
"Message for incorrect legal identity"
|
||||
"Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}"
|
||||
}
|
||||
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
|
||||
eventProcessor!!.transportWriteMessage(msg)
|
||||
|
@ -12,11 +12,9 @@ package net.corda.node.utilities.registration
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.logElapsedTime
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.flows.CashIssueAndPaymentFlow
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
@ -62,6 +60,7 @@ class NodeRegistrationTest : IntegrationTest() {
|
||||
private val notaryName = CordaX500Name("NotaryService", "Zurich", "CH")
|
||||
private val aliceName = CordaX500Name("Alice", "London", "GB")
|
||||
private val genevieveName = CordaX500Name("Genevieve", "London", "GB")
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
@Rule
|
||||
@ -76,7 +75,7 @@ class NodeRegistrationTest : IntegrationTest() {
|
||||
@Before
|
||||
fun startServer() {
|
||||
server = NetworkMapServer(
|
||||
cacheTimeout = 1.minutes,
|
||||
pollInterval = 1.seconds,
|
||||
hostAndPort = portAllocation.nextHostAndPort(),
|
||||
myHostNameValue = "localhost",
|
||||
additionalServices = registrationHandler)
|
||||
@ -106,6 +105,9 @@ class NodeRegistrationTest : IntegrationTest() {
|
||||
startNode(providedName = genevieveName),
|
||||
defaultNotaryNode
|
||||
).transpose().getOrThrow()
|
||||
|
||||
log.info("Nodes started")
|
||||
|
||||
val (alice, genevieve) = nodes
|
||||
|
||||
assertThat(registrationHandler.idsPolled).containsOnly(
|
||||
@ -132,25 +134,33 @@ class RegistrationHandler(private val rootCertAndKeyPair: CertificateAndKeyPair)
|
||||
private val certPaths = HashMap<String, CertPath>()
|
||||
val idsPolled = HashSet<String>()
|
||||
|
||||
companion object {
|
||||
val log = loggerFor<RegistrationHandler>()
|
||||
}
|
||||
|
||||
@POST
|
||||
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
|
||||
@Produces(MediaType.TEXT_PLAIN)
|
||||
fun registration(input: InputStream): Response {
|
||||
val certificationRequest = input.use { JcaPKCS10CertificationRequest(it.readBytes()) }
|
||||
val (certPath, name) = createSignedClientCertificate(
|
||||
certificationRequest,
|
||||
rootCertAndKeyPair.keyPair,
|
||||
listOf(rootCertAndKeyPair.certificate))
|
||||
require(!name.organisation.contains("\\s".toRegex())) { "Whitespace in the organisation name not supported" }
|
||||
certPaths[name.organisation] = certPath
|
||||
return Response.ok(name.organisation).build()
|
||||
return log.logElapsedTime("Registration") {
|
||||
val certificationRequest = input.use { JcaPKCS10CertificationRequest(it.readBytes()) }
|
||||
val (certPath, name) = createSignedClientCertificate(
|
||||
certificationRequest,
|
||||
rootCertAndKeyPair.keyPair,
|
||||
listOf(rootCertAndKeyPair.certificate))
|
||||
require(!name.organisation.contains("\\s".toRegex())) { "Whitespace in the organisation name not supported" }
|
||||
certPaths[name.organisation] = certPath
|
||||
Response.ok(name.organisation).build()
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("{id}")
|
||||
fun reply(@PathParam("id") id: String): Response {
|
||||
idsPolled += id
|
||||
return buildResponse(certPaths[id]!!.certificates)
|
||||
return log.logElapsedTime("Reply by Id") {
|
||||
idsPolled += id
|
||||
buildResponse(certPaths[id]!!.certificates)
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildResponse(certificates: List<Certificate>): Response {
|
||||
|
@ -134,9 +134,11 @@ import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSoftLockManager
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.JVMAgentRegistry
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.node.utilities.NodeBuildProperties
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -222,7 +224,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
private val _nodeReadyFuture = openFuture<Unit>()
|
||||
protected var networkMapClient: NetworkMapClient? = null
|
||||
protected lateinit var networkMapUpdater: NetworkMapUpdater
|
||||
private lateinit var networkMapUpdater: NetworkMapUpdater
|
||||
lateinit var securityManager: RPCSecurityManager
|
||||
|
||||
private val shutdownExecutor = Executors.newSingleThreadExecutor()
|
||||
@ -261,15 +263,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
|
||||
it.transaction {
|
||||
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell.
|
||||
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
|
||||
// like a design smell.
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
|
||||
persistentNetworkMapCache.start()
|
||||
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
|
||||
val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair)
|
||||
nodeInfo
|
||||
}
|
||||
}
|
||||
@ -283,15 +281,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
val identityService = makeIdentityService(identity.certificate)
|
||||
|
||||
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
|
||||
|
||||
val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
|
||||
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
"Node's platform version is lower than network's required minimumPlatformVersion"
|
||||
}
|
||||
|
||||
// Do all of this in a database transaction so anything that might need a connection has one.
|
||||
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
|
||||
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
|
||||
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
|
||||
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
|
||||
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
|
||||
val metrics = MetricRegistry()
|
||||
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
|
||||
@ -342,6 +343,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
startShell()
|
||||
Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
|
||||
}
|
||||
|
||||
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
|
||||
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
|
||||
networkMapClient,
|
||||
@ -349,15 +351,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
configuration.baseDirectory)
|
||||
runOnStop += networkMapUpdater::close
|
||||
|
||||
log.info("Node-info for this node: ${services.myInfo}")
|
||||
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
|
||||
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
|
||||
}
|
||||
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
|
||||
networkMapUpdater.subscribeToNetworkMap()
|
||||
|
||||
// If we successfully loaded network data from database, we set this future to Unit.
|
||||
// If we successfully loaded network data from database, we set this future to Unit.
|
||||
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
|
||||
|
||||
return startedImpl.apply {
|
||||
@ -386,9 +382,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
|
||||
identity: PartyAndCertificate,
|
||||
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
|
||||
private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
|
||||
networkMapClient: NetworkMapClient?,
|
||||
identity: PartyAndCertificate,
|
||||
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
|
||||
val keyPairs = mutableSetOf(identityKeyPair)
|
||||
|
||||
myNotaryIdentity = configuration.notary?.let {
|
||||
@ -402,7 +399,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
val nodeInfoWithBlankSerial = NodeInfo(
|
||||
val potentialNodeInfo = NodeInfo(
|
||||
myAddresses(),
|
||||
setOf(identity, myNotaryIdentity).filterNotNull(),
|
||||
versionInfo.platformVersion,
|
||||
@ -411,15 +408,49 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
|
||||
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
|
||||
|
||||
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
|
||||
val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) {
|
||||
// The node info hasn't changed. We use the one from the database to preserve the serial.
|
||||
log.debug("Node-info hasn't changed")
|
||||
nodeInfoFromDb
|
||||
} else {
|
||||
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
|
||||
log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb")
|
||||
val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
|
||||
networkMapCache.addNode(newNodeInfo)
|
||||
log.info("New node-info: $newNodeInfo")
|
||||
newNodeInfo
|
||||
}
|
||||
|
||||
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
|
||||
val privateKey = keyPairs.single { it.public == publicKey }.private
|
||||
privateKey.sign(serialised.bytes)
|
||||
}
|
||||
|
||||
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
|
||||
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
|
||||
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
|
||||
}
|
||||
|
||||
return Pair(keyPairs, nodeInfo)
|
||||
}
|
||||
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
|
||||
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
} catch (t: Throwable) {
|
||||
log.warn("Error encountered while publishing node info, will retry again", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, 1, TimeUnit.MINUTES)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
protected abstract fun myAddresses(): List<NetworkHostAndPort>
|
||||
|
||||
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
|
||||
|
@ -33,6 +33,7 @@ import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.artemis.ArtemisBroker
|
||||
import net.corda.node.internal.artemis.BrokerAddresses
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.services.api.NodePropertiesStore
|
||||
@ -40,7 +41,7 @@ import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.SecurityConfiguration
|
||||
import net.corda.node.services.config.VerifierType
|
||||
import net.corda.node.services.config.shell.shellUser
|
||||
import net.corda.node.services.config.shell.localShellUser
|
||||
import net.corda.node.services.config.shouldInitCrashShell
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
@ -173,7 +174,9 @@ open class Node(configuration: NodeConfiguration,
|
||||
val securityManagerConfig = configuration.security?.authService ?:
|
||||
SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
|
||||
|
||||
securityManager = RPCSecurityManagerImpl(if (configuration.shouldInitCrashShell()) securityManagerConfig.copyWithAdditionalUser(configuration.shellUser()) else securityManagerConfig)
|
||||
securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) {
|
||||
if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this
|
||||
}
|
||||
|
||||
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker(networkParameters)
|
||||
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) {
|
||||
|
@ -204,7 +204,7 @@ private object RPCPermissionResolver : PermissionResolver {
|
||||
}
|
||||
}
|
||||
|
||||
private class ShiroAuthorizingSubject(
|
||||
class ShiroAuthorizingSubject(
|
||||
private val subjectId: PrincipalCollection,
|
||||
private val manager: DefaultSecurityManager) : AuthorizingSubject {
|
||||
|
||||
@ -219,7 +219,7 @@ private fun buildCredentialMatcher(type: PasswordEncryption) = when (type) {
|
||||
PasswordEncryption.SHIRO_1_CRYPT -> PasswordMatcher()
|
||||
}
|
||||
|
||||
private class InMemoryRealm(users: List<User>,
|
||||
class InMemoryRealm(users: List<User>,
|
||||
realmId: String,
|
||||
passwordEncryption: PasswordEncryption = PasswordEncryption.NONE) : AuthorizingRealm() {
|
||||
|
||||
|
@ -0,0 +1,32 @@
|
||||
package net.corda.node.internal.security
|
||||
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import org.apache.shiro.mgt.DefaultSecurityManager
|
||||
import org.apache.shiro.subject.SimplePrincipalCollection
|
||||
import javax.security.auth.login.FailedLoginException
|
||||
|
||||
/**
|
||||
* Wrapper for [RPCSecurityManager] which creates in-memory [AuthorizingSubject] for [User].
|
||||
* Can be used to add on a specific [User] on top of the principals provided by the [RPCSecurityManager] realm.
|
||||
*/
|
||||
class RPCSecurityManagerWithAdditionalUser(private val delegate: RPCSecurityManager, private val user: User) : RPCSecurityManager by delegate {
|
||||
|
||||
private val realmId = user.username + "Realm"
|
||||
private val shellAuthorizingSubject = ShiroAuthorizingSubject(subjectId = SimplePrincipalCollection(user.username, id.value),
|
||||
manager = DefaultSecurityManager(InMemoryRealm(listOf(user), realmId)))
|
||||
|
||||
@Throws(FailedLoginException::class)
|
||||
override fun authenticate(principal: String, password: Password): AuthorizingSubject =
|
||||
if (user.username == principal && user.password == password.valueAsString) {
|
||||
shellAuthorizingSubject
|
||||
} else {
|
||||
delegate.authenticate(principal, password)
|
||||
}
|
||||
|
||||
override fun buildSubject(principal: String): AuthorizingSubject =
|
||||
if (user.username == principal) {
|
||||
shellAuthorizingSubject
|
||||
} else {
|
||||
delegate.buildSubject(principal)
|
||||
}
|
||||
}
|
@ -40,5 +40,4 @@ fun NodeConfiguration.toShellConfig(): ShellConfiguration {
|
||||
noLocalShell = this.noLocalShell)
|
||||
}
|
||||
|
||||
private fun localShellUser() = User("shell", "shell", setOf(Permissions.all()))
|
||||
fun NodeConfiguration.shellUser() = shouldInitCrashShell()?.let { localShellUser() }
|
||||
fun localShellUser() = User("shell", "shell", setOf(Permissions.all()))
|
||||
|
@ -25,12 +25,7 @@ import net.corda.core.utilities.minutes
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
import net.corda.nodeapi.internal.network.ParametersUpdate
|
||||
import net.corda.nodeapi.internal.network.SignedNetworkParameters
|
||||
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import rx.Subscription
|
||||
import rx.subjects.PublishSubject
|
||||
import java.nio.file.Path
|
||||
@ -38,6 +33,7 @@ import java.nio.file.StandardCopyOption
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val fileWatcher: NodeInfoWatcher,
|
||||
@ -67,36 +63,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
|
||||
}
|
||||
|
||||
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
|
||||
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
|
||||
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
|
||||
// Compare node info without timestamp.
|
||||
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
|
||||
|
||||
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
|
||||
// Only publish and write to disk if there are changes to the node info.
|
||||
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
|
||||
fileWatcher.saveToFile(nodeInfoAndSigned)
|
||||
|
||||
if (networkMapClient != null) {
|
||||
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
|
||||
}
|
||||
}
|
||||
|
||||
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
|
||||
executor.submit(object : Runnable {
|
||||
override fun run() {
|
||||
try {
|
||||
networkMapClient.publish(signedNodeInfo)
|
||||
} catch (t: Throwable) {
|
||||
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
|
||||
// TODO: Exponential backoff?
|
||||
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fun subscribeToNetworkMap() {
|
||||
require(fileWatcherSubscription == null) { "Should not call this method twice." }
|
||||
// Subscribe to file based networkMap
|
||||
@ -124,17 +90,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
|
||||
|
||||
if (currentParametersHash != networkMap.networkParameterHash) {
|
||||
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
|
||||
if (acceptedHash == networkMap.networkParameterHash) {
|
||||
logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
|
||||
} else {
|
||||
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
|
||||
"advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Node will shutdown now. Please update node to use correct network parameters file.")
|
||||
}
|
||||
System.exit(1)
|
||||
exitOnParametersMismatch(networkMap)
|
||||
}
|
||||
|
||||
val currentNodeHashes = networkMapCache.allNodeHashes
|
||||
@ -161,6 +117,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
|
||||
return cacheTimeout
|
||||
}
|
||||
|
||||
private fun exitOnParametersMismatch(networkMap: NetworkMap) {
|
||||
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
|
||||
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
|
||||
val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
|
||||
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
|
||||
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
|
||||
0
|
||||
} else {
|
||||
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
|
||||
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
|
||||
"advertising: ${networkMap.networkParameterHash}.\n" +
|
||||
"Node will shutdown now. Please update node to use correct network parameters file.")
|
||||
1
|
||||
}
|
||||
exitProcess(exitCode)
|
||||
}
|
||||
|
||||
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
|
||||
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
|
||||
// This update was handled already.
|
||||
|
@ -19,7 +19,6 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
@ -31,7 +30,6 @@ import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotEquals
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class NodeTest {
|
||||
@ -77,16 +75,7 @@ class NodeTest {
|
||||
val node = Node(configuration, rigorousMock<VersionInfo>().also {
|
||||
doReturn(platformVersion).whenever(it).platformVersion
|
||||
}, initialiseSerialization = false)
|
||||
val nodeInfo = node.generateNodeInfo()
|
||||
assertEquals(listOf(nodeAddress), nodeInfo.addresses)
|
||||
assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name })
|
||||
assertEquals(platformVersion, nodeInfo.platformVersion)
|
||||
node.generateNodeInfo().let {
|
||||
assertNotEquals(nodeInfo, it) // Different serial.
|
||||
assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial))
|
||||
}
|
||||
PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo)
|
||||
assertEquals(nodeInfo, node.generateNodeInfo())
|
||||
assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,9 +34,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.network.*
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.expect
|
||||
import net.corda.testing.core.expectEvents
|
||||
import net.corda.testing.core.sequence
|
||||
import net.corda.testing.internal.DEV_ROOT_CA
|
||||
import net.corda.testing.internal.TestNodeInfoBuilder
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
@ -66,7 +68,6 @@ class NetworkMapUpdaterTest {
|
||||
private val networkParametersHash = SecureHash.randomSHA256()
|
||||
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
|
||||
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir)
|
||||
private val nodeInfoBuilder = TestNodeInfoBuilder()
|
||||
private var parametersUpdate: ParametersUpdate? = null
|
||||
|
||||
@After
|
||||
@ -75,39 +76,6 @@ class NetworkMapUpdaterTest {
|
||||
fs.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `publish node info`() {
|
||||
nodeInfoBuilder.addIdentity(ALICE_NAME)
|
||||
|
||||
val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned()
|
||||
val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
|
||||
|
||||
// Publish node info for the first time.
|
||||
updater.updateNodeInfo(nodeInfo1AndSigned)
|
||||
// Sleep as publish is asynchronous.
|
||||
// TODO: Remove sleep in unit test
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
verify(networkMapClient, times(1)).publish(any())
|
||||
|
||||
networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo)
|
||||
|
||||
// Publish the same node info, but with different serial.
|
||||
updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(2L * cacheExpiryMs)
|
||||
|
||||
// Same node info should not publish twice
|
||||
verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed)
|
||||
|
||||
val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob")
|
||||
|
||||
// Publish different node info.
|
||||
updater.updateNodeInfo(differentNodeInfoAndSigned)
|
||||
// TODO: Remove sleep in unit test.
|
||||
Thread.sleep(200)
|
||||
verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `process add node updates from network map, with additional node infos from dir`() {
|
||||
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")
|
||||
|
@ -33,8 +33,8 @@ the time controls at the top left of the home page to run the fixings. Click any
|
||||
view it.
|
||||
|
||||
*Note:* The IRS web UI currently has a bug when changing the clock time where it may show no numbers or apply fixings
|
||||
inconsistently. The issues will be addressed in a future milestone release. Meanwhile, you can take a look at a simpler
|
||||
oracle example here: https://github.com/corda/oracle-example.
|
||||
inconsistently. The issues will be addressed in a future release. Meanwhile, you can take a look at a simpler oracle
|
||||
example here: https://github.com/corda/oracle-example.
|
||||
|
||||
## Running the system test
|
||||
|
||||
|
@ -83,6 +83,7 @@ import java.time.Instant
|
||||
import java.time.ZoneOffset.UTC
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.TimeUnit
|
||||
@ -111,7 +112,7 @@ class DriverDSLImpl(
|
||||
override val shutdownManager get() = _shutdownManager!!
|
||||
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
|
||||
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
|
||||
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
|
||||
private val countObservables = ConcurrentHashMap<CordaX500Name, Observable<Int>>()
|
||||
private val nodeNames = mutableSetOf<CordaX500Name>()
|
||||
/**
|
||||
* Future which completes when the network map is available, whether a local one or one from the CZ. This future acts
|
||||
@ -581,15 +582,17 @@ class DriverDSLImpl(
|
||||
}
|
||||
|
||||
/**
|
||||
* @nodeName the name of the node which performs counting
|
||||
* @param initial number of nodes currently in the network map of a running node.
|
||||
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
|
||||
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
|
||||
* the initial value emitted is always [initial]
|
||||
*/
|
||||
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
|
||||
private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
|
||||
ConnectableObservable<Int> {
|
||||
val count = AtomicInteger(initial)
|
||||
return networkMapCacheChangeObservable.map {
|
||||
log.debug("nodeCountObservable for '$nodeName' received '$it'")
|
||||
when (it) {
|
||||
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
|
||||
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
|
||||
@ -605,8 +608,9 @@ class DriverDSLImpl(
|
||||
*/
|
||||
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
|
||||
val (snapshot, updates) = rpc.networkMapFeed()
|
||||
val counterObservable = nodeCountObservable(snapshot.size, updates)
|
||||
countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable
|
||||
val nodeName = rpc.nodeInfo().legalIdentities[0].name
|
||||
val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates)
|
||||
countObservables[nodeName] = counterObservable
|
||||
/* TODO: this might not always be the exact number of nodes one has to wait for,
|
||||
* for example in the following sequence
|
||||
* 1 start 3 nodes in order, A, B, C.
|
||||
@ -617,6 +621,7 @@ class DriverDSLImpl(
|
||||
|
||||
// This is an observable which yield the minimum number of nodes in each node network map.
|
||||
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
|
||||
log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}")
|
||||
args.map { it as Int }.min() ?: 0
|
||||
}
|
||||
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
|
||||
@ -707,7 +712,8 @@ class DriverDSLImpl(
|
||||
if (it == processDeathFuture) {
|
||||
throw ListenProcessDeathException(config.corda.p2pAddress, process)
|
||||
}
|
||||
processDeathFuture.cancel(false)
|
||||
// Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap.
|
||||
processDeathFuture.cancel(true)
|
||||
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
|
||||
OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit)
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ import javax.ws.rs.core.Response
|
||||
import javax.ws.rs.core.Response.ok
|
||||
import javax.ws.rs.core.Response.status
|
||||
|
||||
class NetworkMapServer(private val cacheTimeout: Duration,
|
||||
class NetworkMapServer(private val pollInterval: Duration,
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(),
|
||||
private val myHostNameValue: String = "test.host.name",
|
||||
@ -147,7 +147,7 @@ class NetworkMapServer(private val cacheTimeout: Duration,
|
||||
fun getNetworkMap(): Response {
|
||||
val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate)
|
||||
val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap)
|
||||
return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${cacheTimeout.seconds}").build()
|
||||
return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${pollInterval.seconds}").build()
|
||||
}
|
||||
|
||||
// Remove nodeInfo for testing.
|
||||
@ -187,4 +187,4 @@ class NetworkMapServer(private val cacheTimeout: Duration,
|
||||
@Path("my-hostname")
|
||||
fun getHostName(): Response = Response.ok(myHostNameValue).build()
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user