mirror of
https://github.com/corda/corda.git
synced 2025-04-15 15:07:03 +00:00
Merge remote-tracking branch 'remotes/open/master' into parkri-os-merge-20180809-1
This commit is contained in:
commit
e508033201
@ -287,6 +287,13 @@ absolute path to the node's base directory.
|
||||
|
||||
:flowMonitorSuspensionLoggingThresholdMillis: Threshold ``Duration`` suspended flows waiting for IO need to exceed before they are logged. Default value is ``60 seconds``.
|
||||
|
||||
:jmxReporterType: Provides an option for registering an alternative JMX reporter. Available options are ``JOLOKIA`` and ``NEW_RELIC``. If no value is provided, ``JOLOKIA`` will be used.
|
||||
|
||||
.. note:: The Jolokia configuration is provided by default. The New Relic configuration leverages the Dropwizard_ NewRelicReporter solution. See `Introduction to New Relic for Java`_ for details on how to get started and how to install the New Relic Java agent.
|
||||
|
||||
.. _Dropwizard: https://metrics.dropwizard.io/3.2.3/manual/third-party.html
|
||||
.. _Introduction to New Relic for Java: https://docs.newrelic.com/docs/agents/java-agent/getting-started/introduction-new-relic-java
|
||||
|
||||
:enterpriseConfiguration: Allows fine-grained controls of various features only available in the enterprise version of Corda.
|
||||
|
||||
:tuning: Performance tuning parameters for Corda Enterprise
|
||||
|
@ -117,8 +117,9 @@ The current set of network parameters:
|
||||
|
||||
:epoch: Version number of the network parameters. Starting from 1, this will always increment whenever any of the
|
||||
parameters change.
|
||||
|
||||
:whitelistedContractImplementations: List of whitelisted versions of contract code.
|
||||
For each contract class there is a list of hashes of the approved CorDapp jar versions containing that contract.
|
||||
For each contract class there is a list of SHA-256 hashes of the approved CorDapp jar versions containing that contract.
|
||||
Read more about *Zone constraints* here :doc:`api-contract-constraints`
|
||||
|
||||
:eventHorizon: Time after which nodes are considered to be unresponsive and removed from network map. Nodes republish their
|
||||
|
@ -357,7 +357,7 @@ class NetworkBootstrapper
|
||||
}
|
||||
} else {
|
||||
NetworkParameters(
|
||||
minimumPlatformVersion = 1,
|
||||
minimumPlatformVersion = 4,
|
||||
notaries = notaryInfos,
|
||||
modifiedTime = Instant.now(),
|
||||
maxMessageSize = 10485760,
|
||||
|
@ -237,6 +237,8 @@ dependencies {
|
||||
|
||||
// Jolokia JVM monitoring agent, required to push logs through slf4j
|
||||
compile "org.jolokia:jolokia-jvm:${jolokia_version}:agent"
|
||||
// Optional New Relic JVM reporter, used to push metrics to the configured account associated with a newrelic.yml configuration. See https://mvnrepository.com/artifact/com.palominolabs.metrics/metrics-new-relic
|
||||
compile group: 'com.palominolabs.metrics', name: 'metrics-new-relic', version: '1.1.1'
|
||||
|
||||
// Allow access to simple SOCKS Server for integration testing
|
||||
testCompile("io.netty:netty-example:$netty_version") {
|
||||
|
@ -91,7 +91,7 @@ class ArtemisMessagingTest {
|
||||
}
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
database = configureDatabase(makeInternalTestDataSourceProperties(configSupplier = { ConfigFactory.empty() }), DatabaseConfig(runMigration = true), { null }, { null })
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database).apply { start(emptyList()) }
|
||||
val persistentNetworkMapCache = PersistentNetworkMapCache(database, ALICE_NAME).apply { start(emptyList()) }
|
||||
networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, rigorousMock(), database).apply { start() }
|
||||
}
|
||||
|
||||
|
@ -10,127 +10,155 @@
|
||||
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.node.internal.NodeWithInfo
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.internal.schemas.NodeInfoSchemaV1
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.node.internal.NodeBasedTest
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.Before
|
||||
import org.junit.ClassRule
|
||||
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
|
||||
import org.junit.After
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
// TODO Clean up these tests, they were written with old network map design in place.
|
||||
class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
companion object {
|
||||
val ALICE = TestIdentity(ALICE_NAME, 70).party
|
||||
val BOB = TestIdentity(BOB_NAME, 80).party
|
||||
val DUMMY_REGULATOR = TestIdentity(CordaX500Name("Regulator A", "Paris", "FR"), 100).party
|
||||
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(DUMMY_REGULATOR.name.toDatabaseSchemaName(), ALICE.name.toDatabaseSchemaName(),
|
||||
BOB.name.toDatabaseSchemaName())
|
||||
class PersistentNetworkMapCacheTest {
|
||||
private companion object {
|
||||
val ALICE = TestIdentity(ALICE_NAME, 70)
|
||||
val BOB = TestIdentity(BOB_NAME, 80)
|
||||
val CHARLIE = TestIdentity(CHARLIE_NAME, 90)
|
||||
}
|
||||
private val partiesList = listOf(DUMMY_REGULATOR, ALICE, BOB)
|
||||
private val addressesMap = HashMap<CordaX500Name, NetworkHostAndPort>()
|
||||
private val infos = HashSet<NodeInfo>()
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
val nodes = startNodesWithPort(partiesList)
|
||||
nodes.forEach {
|
||||
infos.add(it.info)
|
||||
addressesMap[it.info.singleIdentity().name] = it.info.addresses[0]
|
||||
it.dispose() // We want them to communicate with NetworkMapService to save data to cache.
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
private var portCounter = 1000
|
||||
private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
|
||||
private val charlieNetMapCache = PersistentNetworkMapCache(database, CHARLIE.name)
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun addNode() {
|
||||
val alice = createNodeInfo(listOf(ALICE))
|
||||
charlieNetMapCache.addNode(alice)
|
||||
assertThat(charlieNetMapCache.nodeReady).isDone()
|
||||
val fromDb = database.transaction {
|
||||
session.createQuery(
|
||||
"from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name}",
|
||||
NodeInfoSchemaV1.PersistentNodeInfo::class.java
|
||||
).resultList.map { it.toNodeInfo() }
|
||||
}
|
||||
assertThat(fromDb).containsOnly(alice)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `adding the node's own node-info doesn't complete the nodeReady future`() {
|
||||
val charlie = createNodeInfo(listOf(CHARLIE))
|
||||
charlieNetMapCache.addNode(charlie)
|
||||
assertThat(charlieNetMapCache.nodeReady).isNotDone()
|
||||
assertThat(charlieNetMapCache.getNodeByLegalName(CHARLIE.name)).isEqualTo(charlie)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `starting with just the node's own node-info in the db`() {
|
||||
val charlie = createNodeInfo(listOf(CHARLIE))
|
||||
saveNodeInfoIntoDb(charlie)
|
||||
assertThat(charlieNetMapCache.allNodes).containsOnly(charlie)
|
||||
charlieNetMapCache.start(emptyList())
|
||||
assertThat(charlieNetMapCache.nodeReady).isNotDone()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `starting with another node-info in the db`() {
|
||||
val alice = createNodeInfo(listOf(ALICE))
|
||||
saveNodeInfoIntoDb(alice)
|
||||
assertThat(charlieNetMapCache.allNodes).containsOnly(alice)
|
||||
charlieNetMapCache.start(emptyList())
|
||||
assertThat(charlieNetMapCache.nodeReady).isDone()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `unknown legal name`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netMapCache = alice.services.networkMapCache
|
||||
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
|
||||
assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE)))
|
||||
assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
|
||||
assertThat(charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
assertThat(charlieNetMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
assertThat(charlieNetMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `nodes in distributed service`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netMapCache = alice.services.networkMapCache
|
||||
charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE)))
|
||||
|
||||
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME)
|
||||
|
||||
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity
|
||||
val distServiceNodeInfos = (1..2).map {
|
||||
val nodeInfo = NodeInfo(
|
||||
addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)),
|
||||
legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity),
|
||||
platformVersion = 3,
|
||||
serial = 1
|
||||
)
|
||||
netMapCache.addNode(nodeInfo)
|
||||
val nodeInfo = createNodeInfo(identities = listOf(TestIdentity.fresh("Org-$it"), distributedIdentity))
|
||||
charlieNetMapCache.addNode(nodeInfo)
|
||||
nodeInfo
|
||||
}
|
||||
|
||||
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
|
||||
assertThatExceptionOfType(IllegalArgumentException::class.java)
|
||||
.isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
|
||||
assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
|
||||
assertThatIllegalArgumentException()
|
||||
.isThrownBy { charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
|
||||
.withMessageContaining(DUMMY_NOTARY_NAME.toString())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `get nodes by owning key and by name`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netCache = alice.services.networkMapCache
|
||||
val res = netCache.getNodeByLegalIdentity(alice.info.singleIdentity())
|
||||
assertEquals(alice.info, res)
|
||||
val res2 = netCache.getNodeByLegalName(DUMMY_REGULATOR.name)
|
||||
assertEquals(infos.singleOrNull { DUMMY_REGULATOR.name in it.legalIdentities.map { it.name } }, res2)
|
||||
val alice = createNodeInfo(listOf(ALICE))
|
||||
charlieNetMapCache.addNode(alice)
|
||||
assertThat(charlieNetMapCache.getNodesByLegalIdentityKey(ALICE.publicKey)).containsOnly(alice)
|
||||
assertThat(charlieNetMapCache.getNodeByLegalName(ALICE.name)).isEqualTo(alice)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `get nodes by address`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val netCache = alice.services.networkMapCache
|
||||
val res = netCache.getNodeByAddress(alice.info.addresses[0])
|
||||
assertEquals(alice.info, res)
|
||||
val alice = createNodeInfo(listOf(ALICE))
|
||||
charlieNetMapCache.addNode(alice)
|
||||
assertThat(charlieNetMapCache.getNodeByAddress(alice.addresses[0])).isEqualTo(alice)
|
||||
}
|
||||
|
||||
// This test has to be done as normal node not mock, because MockNodes don't have addresses.
|
||||
@Test
|
||||
fun `insert two node infos with the same host and port`() {
|
||||
val aliceNode = startNode(ALICE_NAME)
|
||||
val charliePartyCert = getTestPartyAndCertificate(CHARLIE_NAME, generateKeyPair().public)
|
||||
val aliceCache = aliceNode.services.networkMapCache
|
||||
aliceCache.addNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(charliePartyCert)))
|
||||
val res = aliceCache.allNodes.filter { aliceNode.info.addresses[0] in it.addresses }
|
||||
assertEquals(2, res.size)
|
||||
val alice = createNodeInfo(listOf(ALICE))
|
||||
charlieNetMapCache.addNode(alice)
|
||||
val bob = createNodeInfo(listOf(BOB), address = alice.addresses[0])
|
||||
charlieNetMapCache.addNode(bob)
|
||||
val nodeInfos = charlieNetMapCache.allNodes.filter { alice.addresses[0] in it.addresses }
|
||||
assertThat(nodeInfos).hasSize(2)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `restart node with DB map cache`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE))[0]
|
||||
val partyNodes = alice.services.networkMapCache.allNodes
|
||||
assertEquals(infos.size, partyNodes.size)
|
||||
assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet())
|
||||
private fun createNodeInfo(identities: List<TestIdentity>,
|
||||
address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo {
|
||||
return NodeInfo(
|
||||
addresses = listOf(address),
|
||||
legalIdentitiesAndCerts = identities.map { it.identity },
|
||||
platformVersion = 3,
|
||||
serial = 1
|
||||
)
|
||||
}
|
||||
|
||||
// HELPERS
|
||||
// Helper function to restart nodes with the same host and port.
|
||||
private fun startNodesWithPort(nodesToStart: List<Party>, customRetryIntervalMs: Long? = null): List<NodeWithInfo> {
|
||||
return nodesToStart.map { party ->
|
||||
val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) +
|
||||
(customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap())
|
||||
startNode(party.name, configOverrides = configOverrides)
|
||||
private fun saveNodeInfoIntoDb(nodeInfo: NodeInfo) {
|
||||
database.transaction {
|
||||
session.save(NodeInfoSchemaV1.PersistentNodeInfo(
|
||||
id = 0,
|
||||
hash = nodeInfo.serialize().hash.toString(),
|
||||
addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) },
|
||||
legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.mapIndexed { idx, elem ->
|
||||
NodeInfoSchemaV1.DBPartyAndCertificate(elem, isMain = idx == 0)
|
||||
},
|
||||
platformVersion = nodeInfo.platformVersion,
|
||||
serial = nodeInfo.serial
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -153,7 +153,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
// TODO Break cyclic dependency
|
||||
identityService.database = database
|
||||
}
|
||||
private val persistentNetworkMapCache = PersistentNetworkMapCache(database)
|
||||
private val persistentNetworkMapCache = PersistentNetworkMapCache(database, configuration.myLegalName)
|
||||
val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database).tokenize()
|
||||
val checkpointStorage = DBCheckpointStorage()
|
||||
@Suppress("LeakingThis")
|
||||
@ -262,7 +262,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
open fun generateAndSaveNodeInfo(): NodeInfo {
|
||||
check(started == null) { "Node has already been started" }
|
||||
log.info("Generating nodeInfo ...")
|
||||
persistentNetworkMapCache.start(notaries = emptyList())
|
||||
val trustRoot = initKeyStore()
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
startDatabase()
|
||||
@ -270,6 +269,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
|
||||
return database.use {
|
||||
it.transaction {
|
||||
persistentNetworkMapCache.start(notaries = emptyList())
|
||||
val (_, nodeInfoAndSigned) = updateNodeInfo(identity, identityKeyPair, publish = false)
|
||||
nodeInfoAndSigned.nodeInfo
|
||||
}
|
||||
@ -279,9 +279,9 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
fun clearNetworkMapCache() {
|
||||
Node.printBasicNodeInfo("Clearing network map cache entries")
|
||||
log.info("Starting clearing of network map cache entries...")
|
||||
persistentNetworkMapCache.start(notaries = emptyList())
|
||||
startDatabase()
|
||||
database.use {
|
||||
persistentNetworkMapCache.start(notaries = emptyList())
|
||||
persistentNetworkMapCache.clearNetworkMapCache()
|
||||
}
|
||||
}
|
||||
@ -317,7 +317,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
"Node's platform version is lower than network's required minimumPlatformVersion"
|
||||
}
|
||||
servicesForResolution.start(netParams)
|
||||
persistentNetworkMapCache.start(netParams.notaries)
|
||||
|
||||
startDatabase()
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
@ -339,6 +338,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
val (keyPairs, nodeInfoAndSigned, myNotaryIdentity) = database.transaction {
|
||||
persistentNetworkMapCache.start(netParams.notaries)
|
||||
networkMapCache.start()
|
||||
updateNodeInfo(identity, identityKeyPair, publish = true)
|
||||
}
|
||||
|
@ -11,6 +11,10 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import com.codahale.metrics.JmxReporter
|
||||
import com.codahale.metrics.MetricFilter
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.palominolabs.metrics.newrelic.AllEnabledMetricAttributeFilter
|
||||
import com.palominolabs.metrics.newrelic.NewRelicReporter
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.FlowLogic
|
||||
@ -52,6 +56,7 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.SecurityConfiguration
|
||||
import net.corda.node.services.config.shouldInitCrashShell
|
||||
import net.corda.node.services.config.shouldStartLocalShell
|
||||
import net.corda.node.services.config.JmxReporterType
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
@ -74,11 +79,12 @@ import rx.schedulers.Schedulers
|
||||
import java.net.BindException
|
||||
import java.net.InetAddress
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.management.ObjectName
|
||||
import kotlin.system.exitProcess
|
||||
import java.nio.file.Paths
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class NodeWithInfo(val node: Node, val info: NodeInfo) {
|
||||
val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by node.services, FlowStarter by node.flowStarter {}
|
||||
@ -411,18 +417,8 @@ open class Node(configuration: NodeConfiguration,
|
||||
val nodeInfo: NodeInfo = super.start()
|
||||
nodeReadyFuture.thenMatch({
|
||||
serverThread.execute {
|
||||
// Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia:
|
||||
//
|
||||
// https://jolokia.org/agent/jvm.html
|
||||
JmxReporter.forRegistry(services.monitoringService.metrics).inDomain("net.corda").createsObjectNamesWith { _, domain, name ->
|
||||
// Make the JMX hierarchy a bit better organised.
|
||||
val category = name.substringBefore('.')
|
||||
val subName = name.substringAfter('.', "")
|
||||
if (subName == "")
|
||||
ObjectName("$domain:name=$category")
|
||||
else
|
||||
ObjectName("$domain:type=$category,name=$subName")
|
||||
}.build().start()
|
||||
|
||||
registerJmxReporter(services.monitoringService.metrics)
|
||||
|
||||
_startupComplete.set(Unit)
|
||||
}
|
||||
@ -435,6 +431,47 @@ open class Node(configuration: NodeConfiguration,
|
||||
return nodeInfo
|
||||
}
|
||||
|
||||
/**
|
||||
* A hook to allow configuration override of the JmxReporter being used.
|
||||
*/
|
||||
fun registerJmxReporter(metrics: MetricRegistry) {
|
||||
log.info("Registering JMX reporter:")
|
||||
when (configuration.jmxReporterType) {
|
||||
JmxReporterType.JOLOKIA -> registerJolokiaReporter(metrics)
|
||||
JmxReporterType.NEW_RELIC -> registerNewRelicReporter(metrics)
|
||||
}
|
||||
}
|
||||
|
||||
private fun registerJolokiaReporter(registry: MetricRegistry) {
|
||||
log.info("Registering Jolokia JMX reporter:")
|
||||
// Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia:
|
||||
//
|
||||
// https://jolokia.org/agent/jvm.html
|
||||
JmxReporter.forRegistry(registry).inDomain("net.corda").createsObjectNamesWith { _, domain, name ->
|
||||
// Make the JMX hierarchy a bit better organised.
|
||||
val category = name.substringBefore('.')
|
||||
val subName = name.substringAfter('.', "")
|
||||
if (subName == "")
|
||||
ObjectName("$domain:name=$category")
|
||||
else
|
||||
ObjectName("$domain:type=$category,name=$subName")
|
||||
}.build().start()
|
||||
}
|
||||
|
||||
private fun registerNewRelicReporter (registry: MetricRegistry) {
|
||||
log.info("Registering New Relic JMX Reporter:")
|
||||
val reporter = NewRelicReporter.forRegistry(registry)
|
||||
.name("New Relic Reporter")
|
||||
.filter(MetricFilter.ALL)
|
||||
.attributeFilter(AllEnabledMetricAttributeFilter())
|
||||
.rateUnit(TimeUnit.SECONDS)
|
||||
.durationUnit(TimeUnit.MILLISECONDS)
|
||||
.metricNamePrefix("corda/")
|
||||
.build()
|
||||
|
||||
reporter.start(1, TimeUnit.MINUTES)
|
||||
}
|
||||
|
||||
override val rxIoScheduler: Scheduler get() = Schedulers.io()
|
||||
|
||||
private fun initialiseSerialization() {
|
||||
|
@ -86,6 +86,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val flowMonitorPeriodMillis: Duration get() = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS
|
||||
val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS
|
||||
val cordappDirectories: List<Path> get() = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)
|
||||
val jmxReporterType : JmxReporterType? get() = defaultJmxReporterType
|
||||
|
||||
fun validate(): List<String>
|
||||
|
||||
@ -102,9 +103,18 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
const val defaultAttachmentCacheBound = 1024L
|
||||
|
||||
const val cordappDirectoriesKey = "cordappDirectories"
|
||||
|
||||
val defaultJmxReporterType = JmxReporterType.JOLOKIA
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Currently registered JMX Reporters
|
||||
*/
|
||||
enum class JmxReporterType {
|
||||
JOLOKIA, NEW_RELIC
|
||||
}
|
||||
|
||||
data class DevModeOptions(val disableCheckpointChecker: Boolean = false, val allowCompatibilityZone: Boolean = false)
|
||||
|
||||
data class GraphiteOptions(
|
||||
@ -268,7 +278,8 @@ data class NodeConfigurationImpl(
|
||||
private val jarDirs: List<String> = emptyList(),
|
||||
override val flowMonitorPeriodMillis: Duration = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS,
|
||||
override val flowMonitorSuspensionLoggingThresholdMillis: Duration = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS,
|
||||
override val cordappDirectories: List<Path> = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)
|
||||
override val cordappDirectories: List<Path> = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT),
|
||||
override val jmxReporterType: JmxReporterType? = JmxReporterType.JOLOKIA
|
||||
) : NodeConfiguration {
|
||||
companion object {
|
||||
private val logger = loggerFor<NodeConfigurationImpl>()
|
||||
|
@ -10,29 +10,43 @@
|
||||
|
||||
package net.corda.node.services.logging
|
||||
|
||||
import net.corda.core.context.Actor
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.context.InvocationOrigin
|
||||
import net.corda.core.context.Trace
|
||||
import org.slf4j.MDC
|
||||
|
||||
internal fun InvocationContext.pushToLoggingContext() {
|
||||
|
||||
MDC.put("invocation_id", trace.invocationId.value)
|
||||
MDC.put("invocation_timestamp", trace.invocationId.timestamp.toString())
|
||||
MDC.put("session_id", trace.sessionId.value)
|
||||
MDC.put("session_timestamp", trace.sessionId.timestamp.toString())
|
||||
actor?.let {
|
||||
MDC.put("actor_id", it.id.value)
|
||||
MDC.put("actor_store_id", it.serviceId.value)
|
||||
MDC.put("actor_owningIdentity", it.owningLegalIdentity.toString())
|
||||
trace.pushToLoggingContext()
|
||||
actor?.pushToLoggingContext()
|
||||
origin.pushToLoggingContext()
|
||||
externalTrace?.pushToLoggingContext("external_")
|
||||
impersonatedActor?.pushToLoggingContext("impersonating_")
|
||||
}
|
||||
|
||||
internal fun Trace.pushToLoggingContext(prefix: String = "") {
|
||||
|
||||
MDC.getMDCAdapter().apply {
|
||||
put("${prefix}invocation_id", invocationId.value)
|
||||
put("${prefix}invocation_timestamp", invocationId.timestamp.toString())
|
||||
put("${prefix}session_id", sessionId.value)
|
||||
put("${prefix}session_timestamp", sessionId.timestamp.toString())
|
||||
}
|
||||
externalTrace?.let {
|
||||
MDC.put("external_invocation_id", it.invocationId.value)
|
||||
MDC.put("external_invocation_timestamp", it.invocationId.timestamp.toString())
|
||||
MDC.put("external_session_id", it.sessionId.value)
|
||||
MDC.put("external_session_timestamp", it.sessionId.timestamp.toString())
|
||||
}
|
||||
|
||||
internal fun Actor.pushToLoggingContext(prefix: String = "") {
|
||||
|
||||
MDC.getMDCAdapter().apply {
|
||||
put("${prefix}actor_id", id.value)
|
||||
put("${prefix}actor_store_id", serviceId.value)
|
||||
put("${prefix}actor_owning_identity", owningLegalIdentity.toString())
|
||||
}
|
||||
impersonatedActor?.let {
|
||||
MDC.put("impersonating_actor_id", it.id.value)
|
||||
MDC.put("impersonating_actor_store_id", it.serviceId.value)
|
||||
MDC.put("impersonating_actor_owningIdentity", it.owningLegalIdentity.toString())
|
||||
}
|
||||
|
||||
internal fun InvocationOrigin.pushToLoggingContext(prefix: String = "") {
|
||||
|
||||
MDC.getMDCAdapter().apply {
|
||||
put("${prefix}origin", principal().name)
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.api.NetworkMapCacheBaseInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
|
||||
class NetworkMapCacheImpl(
|
||||
private val networkMapCacheBase: NetworkMapCacheBaseInternal,
|
||||
private val identityService: IdentityService,
|
||||
private val database: CordaPersistence
|
||||
) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
fun start() {
|
||||
for (nodeInfo in networkMapCacheBase.allNodes) {
|
||||
for (identity in nodeInfo.legalIdentitiesAndCerts) {
|
||||
identityService.verifyAndRegisterIdentity(identity)
|
||||
}
|
||||
}
|
||||
networkMapCacheBase.changed.subscribe { mapChange ->
|
||||
// TODO how should we handle network map removal
|
||||
if (mapChange is NetworkMapCache.MapChange.Added) {
|
||||
mapChange.node.legalIdentitiesAndCerts.forEach {
|
||||
try {
|
||||
identityService.verifyAndRegisterIdentity(it)
|
||||
} catch (ignore: Exception) {
|
||||
// Log a warning to indicate node info is not added to the network map cache.
|
||||
logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
|
||||
return database.transaction {
|
||||
val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party)
|
||||
wellKnownParty?.let {
|
||||
getNodesByLegalIdentityKey(it.owningKey).firstOrNull()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -13,7 +13,6 @@ package net.corda.node.services.network
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
@ -22,7 +21,6 @@ import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.NotaryInfo
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -32,7 +30,6 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.internal.schemas.NodeInfoSchemaV1
|
||||
import net.corda.node.services.api.NetworkMapCacheBaseInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NonInvalidatingCache
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
@ -44,45 +41,10 @@ import java.security.PublicKey
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
class NetworkMapCacheImpl(
|
||||
private val networkMapCacheBase: NetworkMapCacheBaseInternal,
|
||||
private val identityService: IdentityService,
|
||||
private val database: CordaPersistence
|
||||
) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
fun start() {
|
||||
networkMapCacheBase.allNodes.forEach { it.legalIdentitiesAndCerts.forEach { identityService.verifyAndRegisterIdentity(it) } }
|
||||
networkMapCacheBase.changed.subscribe { mapChange ->
|
||||
// TODO how should we handle network map removal
|
||||
if (mapChange is MapChange.Added) {
|
||||
mapChange.node.legalIdentitiesAndCerts.forEach {
|
||||
try {
|
||||
identityService.verifyAndRegisterIdentity(it)
|
||||
} catch (ignore: Exception) {
|
||||
// Log a warning to indicate node info is not added to the network map cache.
|
||||
logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
|
||||
return database.transaction {
|
||||
val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party)
|
||||
wellKnownParty?.let {
|
||||
getNodesByLegalIdentityKey(it.owningKey).firstOrNull()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Database-based network map cache. */
|
||||
@ThreadSafe
|
||||
open class PersistentNetworkMapCache(private val database: CordaPersistence) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
|
||||
open class PersistentNetworkMapCache(private val database: CordaPersistence,
|
||||
private val myLegalName: CordaX500Name) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
@ -115,6 +77,15 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence) : S
|
||||
|
||||
fun start(notaries: List<NotaryInfo>) {
|
||||
this.notaries = notaries
|
||||
val otherNodeInfoCount = database.transaction {
|
||||
session.createQuery(
|
||||
"select count(*) from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n join n.legalIdentitiesAndCerts i where i.name != :myLegalName")
|
||||
.setParameter("myLegalName", myLegalName.toString())
|
||||
.singleResult as Long
|
||||
}
|
||||
if (otherNodeInfoCount > 0) {
|
||||
_nodeReady.set(null)
|
||||
}
|
||||
}
|
||||
|
||||
override fun getNodeByHash(nodeHash: SecureHash): NodeInfo? {
|
||||
@ -212,7 +183,9 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence) : S
|
||||
logger.info("Previous node was identical to incoming one - doing nothing")
|
||||
}
|
||||
}
|
||||
_nodeReady.set(null)
|
||||
if (node.legalIdentities[0].name != myLegalName) {
|
||||
_nodeReady.set(null)
|
||||
}
|
||||
logger.debug { "Done adding node with info: $node" }
|
||||
}
|
||||
|
||||
|
@ -49,3 +49,5 @@ flowTimeout {
|
||||
maxRestartCount = 6
|
||||
backoffBase = 1.8
|
||||
}
|
||||
jmxReporterType = JOLOKIA
|
||||
|
||||
|
@ -0,0 +1,52 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
|
||||
class NodeRestartTests {
|
||||
private val mockNet = InternalMockNetwork(threadPerNode = true, autoVisibleNodes = false, notarySpecs = emptyList())
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `restart with no network map cache update`() {
|
||||
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
val bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
|
||||
bob.registerInitiatedFlow(Responder::class.java)
|
||||
alice.services.networkMapCache.addNode(bob.info)
|
||||
bob.services.networkMapCache.addNode(alice.info)
|
||||
val alice2 = mockNet.restartNode(alice)
|
||||
val result = alice2.services.startFlow(Initiator(bob.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
assertThat(result).isEqualTo(123)
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class Initiator(private val otherSide: Party) : FlowLogic<Int>() {
|
||||
@Suspendable
|
||||
override fun call(): Int = initiateFlow(otherSide).receive<Int>().unwrap { it }
|
||||
}
|
||||
|
||||
@InitiatedBy(Initiator::class)
|
||||
private class Responder(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() = otherSide.send(123)
|
||||
}
|
||||
}
|
@ -255,6 +255,29 @@ class NodeConfigurationImplTest {
|
||||
assertEquals(compatibilityZoneURL, configuration.networkServices!!.networkMapURL)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `jmxReporterType is null and defaults to Jokolia`() {
|
||||
var rawConfig = getConfig("working-config.conf", ConfigFactory.parseMap(mapOf("devMode" to true)))
|
||||
val nodeConfig = rawConfig.parseAsNodeConfiguration()
|
||||
assertTrue(JmxReporterType.JOLOKIA.toString() == nodeConfig.jmxReporterType.toString())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `jmxReporterType is not null and is set to New Relic`() {
|
||||
var rawConfig = getConfig("working-config.conf", ConfigFactory.parseMap(mapOf("devMode" to true)))
|
||||
rawConfig = rawConfig.withValue("jmxReporterType", ConfigValueFactory.fromAnyRef("NEW_RELIC"))
|
||||
val nodeConfig = rawConfig.parseAsNodeConfiguration()
|
||||
assertTrue(JmxReporterType.NEW_RELIC.toString() == nodeConfig.jmxReporterType.toString())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `jmxReporterType is not null and set to Jokolia`() {
|
||||
var rawConfig = getConfig("working-config.conf", ConfigFactory.parseMap(mapOf("devMode" to true)))
|
||||
rawConfig = rawConfig.withValue("jmxReporterType", ConfigValueFactory.fromAnyRef("JOLOKIA"))
|
||||
val nodeConfig = rawConfig.parseAsNodeConfiguration()
|
||||
assertTrue(JmxReporterType.JOLOKIA.toString() == nodeConfig.jmxReporterType.toString())
|
||||
}
|
||||
|
||||
private fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?): NodeConfiguration {
|
||||
return testConfiguration.copy(devMode = devMode, devModeOptions = devModeOptions)
|
||||
}
|
||||
|
@ -92,6 +92,7 @@ data class MockNodeArgs(
|
||||
val version: VersionInfo = MOCK_VERSION_INFO
|
||||
)
|
||||
|
||||
// TODO We don't need a parameters object as this is internal only
|
||||
data class InternalMockNodeParameters(
|
||||
val forcedID: Int? = null,
|
||||
val legalName: CordaX500Name? = null,
|
||||
@ -157,7 +158,8 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
val testDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()),
|
||||
val networkParameters: NetworkParameters = testNetworkParameters(),
|
||||
val defaultFactory: (MockNodeArgs, CordappLoader?) -> MockNode = { args, cordappLoader -> cordappLoader?.let { MockNode(args, it) } ?: MockNode(args) },
|
||||
val cordappsForAllNodes: Set<TestCorDapp> = emptySet()) : AutoCloseable {
|
||||
val cordappsForAllNodes: Set<TestCorDapp> = emptySet(),
|
||||
val autoVisibleNodes: Boolean = true) : AutoCloseable {
|
||||
init {
|
||||
// Apache SSHD for whatever reason registers a SFTP FileSystemProvider - which gets loaded by JimFS.
|
||||
// This SFTP support loads BouncyCastle, which we want to avoid.
|
||||
@ -369,6 +371,7 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
}
|
||||
|
||||
private fun advertiseNodeToNetwork(newNode: TestStartedNode) {
|
||||
if (!mockNet.autoVisibleNodes) return
|
||||
mockNet.nodes
|
||||
.mapNotNull { it.started }
|
||||
.forEach { existingNode ->
|
||||
|
Loading…
x
Reference in New Issue
Block a user