Merge pull request #680 from corda/shams-os-merge-0af42bd

O/S merge from 0af42bd
This commit is contained in:
Shams Asari 2018-04-04 17:38:18 +01:00 committed by GitHub
commit 4c123643ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 180 additions and 236 deletions

View File

@ -176,8 +176,8 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
controlMessage.keyStorePrivateKeyPassword,
controlMessage.trustStoreBytes,
controlMessage.trustStorePassword)
forwardAddress = receivedMessage.destinationLink
forwardLegalName = receivedMessage.destinationLegalName
forwardAddress = receivedMessage.sourceLink
forwardLegalName = receivedMessage.sourceLegalName
}
is DeactivateFloat -> {
log.info("Received Tunnel Deactivate message")

View File

@ -1,12 +1,12 @@
Changelog
=========
Here's a summary of what's changed in each Corda release. For guidance on how to upgrade code from the previous
release, see :doc:`upgrade-notes`.
Unreleased
----------
Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code
from the previous milestone release.
* Errors thrown by a Corda node will now reported to a calling RPC client with attention to serialization and obfuscation of internal data.
* Serializing an inner class (non-static nested class in Java, inner class in Kotlin) will be rejected explicitly by the serialization

View File

@ -369,9 +369,9 @@ internal class ConnectionStateMachine(serverMode: Boolean,
payload,
link.source.address,
remoteLegalName,
NetworkHostAndPort(localAddress.hostString, localAddress.port),
localLegalName,
NetworkHostAndPort(remoteAddress.hostString, remoteAddress.port),
localLegalName,
NetworkHostAndPort(localAddress.hostString, localAddress.port),
appProperties,
channel,
delivery)

View File

@ -149,10 +149,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
is SendableMessageImpl -> {
val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port)
require(inetAddress == remoteAddress) {
"Message for incorrect endpoint"
"Message for incorrect endpoint $inetAddress expected $remoteAddress"
}
require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) {
"Message for incorrect legal identity"
"Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}"
}
log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" }
eventProcessor!!.transportWriteMessage(msg)

View File

@ -12,11 +12,9 @@ package net.corda.node.utilities.registration
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.logElapsedTime
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.*
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
@ -62,6 +60,7 @@ class NodeRegistrationTest : IntegrationTest() {
private val notaryName = CordaX500Name("NotaryService", "Zurich", "CH")
private val aliceName = CordaX500Name("Alice", "London", "GB")
private val genevieveName = CordaX500Name("Genevieve", "London", "GB")
private val log = contextLogger()
}
@Rule
@ -76,7 +75,7 @@ class NodeRegistrationTest : IntegrationTest() {
@Before
fun startServer() {
server = NetworkMapServer(
cacheTimeout = 1.minutes,
pollInterval = 1.seconds,
hostAndPort = portAllocation.nextHostAndPort(),
myHostNameValue = "localhost",
additionalServices = registrationHandler)
@ -106,6 +105,9 @@ class NodeRegistrationTest : IntegrationTest() {
startNode(providedName = genevieveName),
defaultNotaryNode
).transpose().getOrThrow()
log.info("Nodes started")
val (alice, genevieve) = nodes
assertThat(registrationHandler.idsPolled).containsOnly(
@ -132,10 +134,15 @@ class RegistrationHandler(private val rootCertAndKeyPair: CertificateAndKeyPair)
private val certPaths = HashMap<String, CertPath>()
val idsPolled = HashSet<String>()
companion object {
val log = loggerFor<RegistrationHandler>()
}
@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Produces(MediaType.TEXT_PLAIN)
fun registration(input: InputStream): Response {
return log.logElapsedTime("Registration") {
val certificationRequest = input.use { JcaPKCS10CertificationRequest(it.readBytes()) }
val (certPath, name) = createSignedClientCertificate(
certificationRequest,
@ -143,14 +150,17 @@ class RegistrationHandler(private val rootCertAndKeyPair: CertificateAndKeyPair)
listOf(rootCertAndKeyPair.certificate))
require(!name.organisation.contains("\\s".toRegex())) { "Whitespace in the organisation name not supported" }
certPaths[name.organisation] = certPath
return Response.ok(name.organisation).build()
Response.ok(name.organisation).build()
}
}
@GET
@Path("{id}")
fun reply(@PathParam("id") id: String): Response {
return log.logElapsedTime("Reply by Id") {
idsPolled += id
return buildResponse(certPaths[id]!!.certificates)
buildResponse(certPaths[id]!!.certificates)
}
}
private fun buildResponse(certificates: List<Certificate>): Response {

View File

@ -19,16 +19,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.sign
import net.corda.core.flows.ContractUpgradeFlow
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryChangeFlow
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.StartableByService
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
@ -37,24 +28,9 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.RPCOps
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.messaging.*
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
@ -74,75 +50,33 @@ import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.DummyAuditService
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.api.*
import net.corda.node.services.config.*
import net.corda.node.services.config.shell.toShellConfig
import net.corda.node.services.config.shouldInitCrashShell
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.NetworkMapClient
import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.AbstractPartyDescriptor
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.NodePropertiesPersistentStore
import net.corda.node.services.persistence.RunOnceService
import net.corda.node.services.network.*
import net.corda.node.services.persistence.*
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.SingleThreadedStateMachineManager
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.appName
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.BFTSMaRt
import net.corda.node.services.transactions.MySQLNonValidatingNotaryService
import net.corda.node.services.transactions.MySQLValidatingNotaryService
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.services.statemachine.*
import net.corda.node.services.transactions.*
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JVMAgentRegistry
import net.corda.node.utilities.NamedThreadFactory
import net.corda.node.utilities.NodeBuildProperties
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.SchemaMigration
import net.corda.nodeapi.internal.persistence.isH2Database
import net.corda.nodeapi.internal.persistence.*
import net.corda.nodeapi.internal.storeLegalIdentity
import net.corda.tools.shell.InteractiveShell
import org.apache.activemq.artemis.utils.ReusableLatch
@ -165,6 +99,7 @@ import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.collections.set
import kotlin.reflect.KClass
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
@ -222,7 +157,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val runOnStop = ArrayList<() -> Any?>()
private val _nodeReadyFuture = openFuture<Unit>()
protected var networkMapClient: NetworkMapClient? = null
protected lateinit var networkMapUpdater: NetworkMapUpdater
private lateinit var networkMapUpdater: NetworkMapUpdater
lateinit var securityManager: RPCSecurityManager
private val shutdownExecutor = Executors.newSingleThreadExecutor()
@ -261,15 +196,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
it.transaction {
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell.
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
// like a design smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair)
nodeInfo
}
}
@ -283,15 +214,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
val identityService = makeIdentityService(identity.certificate)
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
}
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val metrics = MetricRegistry()
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
@ -342,6 +276,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
startShell()
Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
networkMapClient,
@ -349,12 +284,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
configuration.baseDirectory)
runOnStop += networkMapUpdater::close
log.info("Node-info for this node: ${services.myInfo}")
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
}
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
networkMapUpdater.subscribeToNetworkMap()
// If we successfully loaded network data from database, we set this future to Unit.
@ -386,7 +315,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
networkMapClient: NetworkMapClient?,
identity: PartyAndCertificate,
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
val keyPairs = mutableSetOf(identityKeyPair)
@ -402,7 +332,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
val nodeInfoWithBlankSerial = NodeInfo(
val potentialNodeInfo = NodeInfo(
myAddresses(),
setOf(identity, myNotaryIdentity).filterNotNull(),
versionInfo.platformVersion,
@ -411,15 +341,49 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) {
// The node info hasn't changed. We use the one from the database to preserve the serial.
log.debug("Node-info hasn't changed")
nodeInfoFromDb
} else {
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb")
val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
networkMapCache.addNode(newNodeInfo)
log.info("New node-info: $newNodeInfo")
newNodeInfo
}
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
return Pair(keyPairs, nodeInfo)
}
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
executor.submit(object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
log.warn("Error encountered while publishing node info, will retry again", t)
// TODO: Exponential backoff?
executor.schedule(this, 1, TimeUnit.MINUTES)
}
}
})
}
protected abstract fun myAddresses(): List<NetworkHostAndPort>
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {

View File

@ -33,6 +33,7 @@ import net.corda.node.VersionInfo
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.api.NodePropertiesStore
@ -40,7 +41,7 @@ import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.config.shell.shellUser
import net.corda.node.services.config.shell.localShellUser
import net.corda.node.services.config.shouldInitCrashShell
import net.corda.node.services.messaging.*
import net.corda.node.services.rpc.ArtemisRpcBroker
@ -173,7 +174,9 @@ open class Node(configuration: NodeConfiguration,
val securityManagerConfig = configuration.security?.authService ?:
SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
securityManager = RPCSecurityManagerImpl(if (configuration.shouldInitCrashShell()) securityManagerConfig.copyWithAdditionalUser(configuration.shellUser()) else securityManagerConfig)
securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) {
if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this
}
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker(networkParameters)
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) {

View File

@ -204,7 +204,7 @@ private object RPCPermissionResolver : PermissionResolver {
}
}
private class ShiroAuthorizingSubject(
class ShiroAuthorizingSubject(
private val subjectId: PrincipalCollection,
private val manager: DefaultSecurityManager) : AuthorizingSubject {
@ -219,7 +219,7 @@ private fun buildCredentialMatcher(type: PasswordEncryption) = when (type) {
PasswordEncryption.SHIRO_1_CRYPT -> PasswordMatcher()
}
private class InMemoryRealm(users: List<User>,
class InMemoryRealm(users: List<User>,
realmId: String,
passwordEncryption: PasswordEncryption = PasswordEncryption.NONE) : AuthorizingRealm() {

View File

@ -0,0 +1,32 @@
package net.corda.node.internal.security
import net.corda.nodeapi.internal.config.User
import org.apache.shiro.mgt.DefaultSecurityManager
import org.apache.shiro.subject.SimplePrincipalCollection
import javax.security.auth.login.FailedLoginException
/**
* Wrapper for [RPCSecurityManager] which creates in-memory [AuthorizingSubject] for [User].
* Can be used to add on a specific [User] on top of the principals provided by the [RPCSecurityManager] realm.
*/
class RPCSecurityManagerWithAdditionalUser(private val delegate: RPCSecurityManager, private val user: User) : RPCSecurityManager by delegate {
private val realmId = user.username + "Realm"
private val shellAuthorizingSubject = ShiroAuthorizingSubject(subjectId = SimplePrincipalCollection(user.username, id.value),
manager = DefaultSecurityManager(InMemoryRealm(listOf(user), realmId)))
@Throws(FailedLoginException::class)
override fun authenticate(principal: String, password: Password): AuthorizingSubject =
if (user.username == principal && user.password == password.valueAsString) {
shellAuthorizingSubject
} else {
delegate.authenticate(principal, password)
}
override fun buildSubject(principal: String): AuthorizingSubject =
if (user.username == principal) {
shellAuthorizingSubject
} else {
delegate.buildSubject(principal)
}
}

View File

@ -40,5 +40,4 @@ fun NodeConfiguration.toShellConfig(): ShellConfiguration {
noLocalShell = this.noLocalShell)
}
private fun localShellUser() = User("shell", "shell", setOf(Permissions.all()))
fun NodeConfiguration.shellUser() = shouldInitCrashShell()?.let { localShellUser() }
fun localShellUser() = User("shell", "shell", setOf(Permissions.all()))

View File

@ -25,12 +25,7 @@ import net.corda.core.utilities.minutes
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import net.corda.nodeapi.internal.network.*
import rx.Subscription
import rx.subjects.PublishSubject
import java.nio.file.Path
@ -38,6 +33,7 @@ import java.nio.file.StandardCopyOption
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
@ -67,36 +63,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
}
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
// Compare node info without timestamp.
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
// Only publish and write to disk if there are changes to the node info.
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
fileWatcher.saveToFile(nodeInfoAndSigned)
if (networkMapClient != null) {
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
}
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
executor.submit(object : Runnable {
override fun run() {
try {
networkMapClient.publish(signedNodeInfo)
} catch (t: Throwable) {
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
// TODO: Exponential backoff?
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
}
}
})
}
fun subscribeToNetworkMap() {
require(fileWatcherSubscription == null) { "Should not call this method twice." }
// Subscribe to file based networkMap
@ -124,17 +90,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
if (currentParametersHash != networkMap.networkParameterHash) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
if (acceptedHash == networkMap.networkParameterHash) {
logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
} else {
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
"advertising: ${networkMap.networkParameterHash}.\n" +
"Node will shutdown now. Please update node to use correct network parameters file.")
}
System.exit(1)
exitOnParametersMismatch(networkMap)
}
val currentNodeHashes = networkMapCache.allNodeHashes
@ -161,6 +117,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
return cacheTimeout
}
private fun exitOnParametersMismatch(networkMap: NetworkMap) {
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
0
} else {
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
"advertising: ${networkMap.networkParameterHash}.\n" +
"Node will shutdown now. Please update node to use correct network parameters file.")
1
}
exitProcess(exitCode)
}
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
// This update was handled already.

View File

@ -19,7 +19,6 @@ import net.corda.core.serialization.deserialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -31,7 +30,6 @@ import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.file.Files
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertNull
class NodeTest {
@ -77,16 +75,7 @@ class NodeTest {
val node = Node(configuration, rigorousMock<VersionInfo>().also {
doReturn(platformVersion).whenever(it).platformVersion
}, initialiseSerialization = false)
val nodeInfo = node.generateNodeInfo()
assertEquals(listOf(nodeAddress), nodeInfo.addresses)
assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name })
assertEquals(platformVersion, nodeInfo.platformVersion)
node.generateNodeInfo().let {
assertNotEquals(nodeInfo, it) // Different serial.
assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial))
}
PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo)
assertEquals(nodeInfo, node.generateNodeInfo())
assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
}
}
}

View File

@ -34,9 +34,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.network.*
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.expect
import net.corda.testing.core.expectEvents
import net.corda.testing.core.sequence
import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.TestNodeInfoBuilder
import net.corda.testing.internal.createNodeInfoAndSigned
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
@ -66,7 +68,6 @@ class NetworkMapUpdaterTest {
private val networkParametersHash = SecureHash.randomSHA256()
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler)
private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir)
private val nodeInfoBuilder = TestNodeInfoBuilder()
private var parametersUpdate: ParametersUpdate? = null
@After
@ -75,39 +76,6 @@ class NetworkMapUpdaterTest {
fs.close()
}
@Test
fun `publish node info`() {
nodeInfoBuilder.addIdentity(ALICE_NAME)
val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned()
val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis())
// Publish node info for the first time.
updater.updateNodeInfo(nodeInfo1AndSigned)
// Sleep as publish is asynchronous.
// TODO: Remove sleep in unit test
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapClient, times(1)).publish(any())
networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo)
// Publish the same node info, but with different serial.
updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// Same node info should not publish twice
verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed)
val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob")
// Publish different node info.
updater.updateNodeInfo(differentNodeInfoAndSigned)
// TODO: Remove sleep in unit test.
Thread.sleep(200)
verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed)
}
@Test
fun `process add node updates from network map, with additional node infos from dir`() {
val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1")

View File

@ -33,8 +33,8 @@ the time controls at the top left of the home page to run the fixings. Click any
view it.
*Note:* The IRS web UI currently has a bug when changing the clock time where it may show no numbers or apply fixings
inconsistently. The issues will be addressed in a future milestone release. Meanwhile, you can take a look at a simpler
oracle example here: https://github.com/corda/oracle-example.
inconsistently. The issues will be addressed in a future release. Meanwhile, you can take a look at a simpler oracle
example here: https://github.com/corda/oracle-example.
## Running the system test

View File

@ -83,6 +83,7 @@ import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
@ -111,7 +112,7 @@ class DriverDSLImpl(
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
private val countObservables = ConcurrentHashMap<CordaX500Name, Observable<Int>>()
private val nodeNames = mutableSetOf<CordaX500Name>()
/**
* Future which completes when the network map is available, whether a local one or one from the CZ. This future acts
@ -581,15 +582,17 @@ class DriverDSLImpl(
}
/**
* @nodeName the name of the node which performs counting
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map {
log.debug("nodeCountObservable for '$nodeName' received '$it'")
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
@ -605,8 +608,9 @@ class DriverDSLImpl(
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val counterObservable = nodeCountObservable(snapshot.size, updates)
countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable
val nodeName = rpc.nodeInfo().legalIdentities[0].name
val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates)
countObservables[nodeName] = counterObservable
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
@ -617,6 +621,7 @@ class DriverDSLImpl(
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}")
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
@ -707,7 +712,8 @@ class DriverDSLImpl(
if (it == processDeathFuture) {
throw ListenProcessDeathException(config.corda.p2pAddress, process)
}
processDeathFuture.cancel(false)
// Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap.
processDeathFuture.cancel(true)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit)
}

View File

@ -42,7 +42,7 @@ import javax.ws.rs.core.Response
import javax.ws.rs.core.Response.ok
import javax.ws.rs.core.Response.status
class NetworkMapServer(private val cacheTimeout: Duration,
class NetworkMapServer(private val pollInterval: Duration,
hostAndPort: NetworkHostAndPort,
private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(),
private val myHostNameValue: String = "test.host.name",
@ -147,7 +147,7 @@ class NetworkMapServer(private val cacheTimeout: Duration,
fun getNetworkMap(): Response {
val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate)
val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap)
return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${cacheTimeout.seconds}").build()
return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${pollInterval.seconds}").build()
}
// Remove nodeInfo for testing.