diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 9c0e79e1ce..35fdfaad3f 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -287,6 +287,13 @@ absolute path to the node's base directory. :flowMonitorSuspensionLoggingThresholdMillis: Threshold ``Duration`` suspended flows waiting for IO need to exceed before they are logged. Default value is ``60 seconds``. +:jmxReporterType: Provides an option for registering an alternative JMX reporter. Available options are ``JOLOKIA`` and ``NEW_RELIC``. If no value is provided, ``JOLOKIA`` will be used. + + .. note:: The Jolokia configuration is provided by default. The New Relic configuration leverages the Dropwizard_ NewRelicReporter solution. See `Introduction to New Relic for Java`_ for details on how to get started and how to install the New Relic Java agent. + + .. _Dropwizard: https://metrics.dropwizard.io/3.2.3/manual/third-party.html + .. _Introduction to New Relic for Java: https://docs.newrelic.com/docs/agents/java-agent/getting-started/introduction-new-relic-java + :enterpriseConfiguration: Allows fine-grained controls of various features only available in the enterprise version of Corda. :tuning: Performance tuning parameters for Corda Enterprise diff --git a/docs/source/network-map.rst b/docs/source/network-map.rst index e1e49dc134..db4c405234 100644 --- a/docs/source/network-map.rst +++ b/docs/source/network-map.rst @@ -117,8 +117,9 @@ The current set of network parameters: :epoch: Version number of the network parameters. Starting from 1, this will always increment whenever any of the parameters change. + :whitelistedContractImplementations: List of whitelisted versions of contract code. - For each contract class there is a list of hashes of the approved CorDapp jar versions containing that contract. + For each contract class there is a list of SHA-256 hashes of the approved CorDapp jar versions containing that contract. Read more about *Zone constraints* here :doc:`api-contract-constraints` :eventHorizon: Time after which nodes are considered to be unresponsive and removed from network map. Nodes republish their diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt index edc4a1cdd2..fc6be347ec 100644 --- a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt @@ -97,7 +97,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri identityService.database = database } - private val persistentNetworkMapCache = PersistentNetworkMapCache(database) + private val persistentNetworkMapCache = PersistentNetworkMapCache(database, myInfo.legalIdentities[0].name) override val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database).tokenize() private val checkpointStorage = DBCheckpointStorage() @Suppress("LeakingThis") diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index ee86116d96..8c85fa1c1b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -357,7 +357,7 @@ class NetworkBootstrapper } } else { NetworkParameters( - minimumPlatformVersion = 1, + minimumPlatformVersion = 4, notaries = notaryInfos, modifiedTime = Instant.now(), maxMessageSize = 10485760, diff --git a/node/build.gradle b/node/build.gradle index bb578015f3..d65efa0db7 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -237,6 +237,8 @@ dependencies { // Jolokia JVM monitoring agent, required to push logs through slf4j compile "org.jolokia:jolokia-jvm:${jolokia_version}:agent" + // Optional New Relic JVM reporter, used to push metrics to the configured account associated with a newrelic.yml configuration. See https://mvnrepository.com/artifact/com.palominolabs.metrics/metrics-new-relic + compile group: 'com.palominolabs.metrics', name: 'metrics-new-relic', version: '1.1.1' // Allow access to simple SOCKS Server for integration testing testCompile("io.netty:netty-example:$netty_version") { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 1951e2a578..899b297b49 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -91,7 +91,7 @@ class ArtemisMessagingTest { } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeInternalTestDataSourceProperties(configSupplier = { ConfigFactory.empty() }), DatabaseConfig(runMigration = true), { null }, { null }) - val persistentNetworkMapCache = PersistentNetworkMapCache(database).apply { start(emptyList()) } + val persistentNetworkMapCache = PersistentNetworkMapCache(database, ALICE_NAME).apply { start(emptyList()) } networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, rigorousMock(), database).apply { start() } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt index 88ac9b4c30..d336f3005e 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -10,127 +10,171 @@ package net.corda.node.services.network -import net.corda.core.crypto.generateKeyPair -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party import net.corda.core.node.NodeInfo +import net.corda.core.serialization.serialize import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.internal.configureDatabase +import net.corda.node.internal.schemas.NodeInfoSchemaV1 +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.testing.core.* +import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.toDatabaseSchemaName -import net.corda.node.internal.NodeWithInfo -import net.corda.testing.core.* -import net.corda.testing.node.internal.NodeBasedTest +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.node.internal.makeTestDatabaseProperties import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType -import org.junit.Before -import org.junit.ClassRule -import org.junit.Test -import kotlin.test.assertEquals +import org.assertj.core.api.Assertions.assertThatIllegalArgumentException +import org.junit.* -// TODO Clean up these tests, they were written with old network map design in place. -class PersistentNetworkMapCacheTest : NodeBasedTest() { - companion object { - val ALICE = TestIdentity(ALICE_NAME, 70).party - val BOB = TestIdentity(BOB_NAME, 80).party - val DUMMY_REGULATOR = TestIdentity(CordaX500Name("Regulator A", "Paris", "FR"), 100).party +class PersistentNetworkMapCacheTest : IntegrationTest() { + private companion object { + val ALICE = TestIdentity(ALICE_NAME, 70) + val BOB = TestIdentity(BOB_NAME, 80) + val CHARLIE = TestIdentity(CHARLIE_NAME, 90) @ClassRule @JvmField - val databaseSchemas = IntegrationTestSchemas(DUMMY_REGULATOR.name.toDatabaseSchemaName(), ALICE.name.toDatabaseSchemaName(), - BOB.name.toDatabaseSchemaName()) + val databaseSchemas = IntegrationTestSchemas(CHARLIE_NAME.toDatabaseSchemaName()) } - private val partiesList = listOf(DUMMY_REGULATOR, ALICE, BOB) - private val addressesMap = HashMap() - private val infos = HashSet() - @Before - fun start() { - val nodes = startNodesWithPort(partiesList) - nodes.forEach { - infos.add(it.info) - addressesMap[it.info.singleIdentity().name] = it.info.addresses[0] - it.dispose() // We want them to communicate with NetworkMapService to save data to cache. + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + private var portCounter = 1000 + //Enterprise only - objects created in the setup method, below initialized with dummy values to avoid need for nullable type declaration + private var database = CordaPersistence(DatabaseConfig(), emptySet()) + private var charlieNetMapCache = PersistentNetworkMapCache(database, CHARLIE.name) + + @Before() + fun setup() { + //Enterprise only - for test in database mode ensure the remote database is setup before creating CordaPersistence + super.setUp() + database = configureDatabase(makeTestDataSourceProperties(CHARLIE_NAME.toDatabaseSchemaName()), makeTestDatabaseProperties(CHARLIE_NAME.toDatabaseSchemaName()), { null }, { null }) + charlieNetMapCache = PersistentNetworkMapCache(database, CHARLIE.name) + } + + @After + fun cleanUp() { + database.close() + } + + @Test + fun addNode() { + val alice = createNodeInfo(listOf(ALICE)) + charlieNetMapCache.addNode(alice) + assertThat(charlieNetMapCache.nodeReady).isDone() + val fromDb = database.transaction { + session.createQuery( + "from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name}", + NodeInfoSchemaV1.PersistentNodeInfo::class.java + ).resultList.map { it.toNodeInfo() } } + assertThat(fromDb).containsOnly(alice) + } + + @Test + fun `adding the node's own node-info doesn't complete the nodeReady future`() { + val charlie = createNodeInfo(listOf(CHARLIE)) + charlieNetMapCache.addNode(charlie) + assertThat(charlieNetMapCache.nodeReady).isNotDone() + assertThat(charlieNetMapCache.getNodeByLegalName(CHARLIE.name)).isEqualTo(charlie) + } + + @Test + fun `starting with just the node's own node-info in the db`() { + val charlie = createNodeInfo(listOf(CHARLIE)) + saveNodeInfoIntoDb(charlie) + assertThat(charlieNetMapCache.allNodes).containsOnly(charlie) + charlieNetMapCache.start(emptyList()) + assertThat(charlieNetMapCache.nodeReady).isNotDone() + } + + @Test + fun `starting with another node-info in the db`() { + val alice = createNodeInfo(listOf(ALICE)) + saveNodeInfoIntoDb(alice) + assertThat(charlieNetMapCache.allNodes).containsOnly(alice) + charlieNetMapCache.start(emptyList()) + assertThat(charlieNetMapCache.nodeReady).isDone() } @Test fun `unknown legal name`() { - val alice = startNodesWithPort(listOf(ALICE))[0] - val netMapCache = alice.services.networkMapCache - assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty() - assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull() - assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull() - assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull() + charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) + assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty() + assertThat(charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull() + assertThat(charlieNetMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull() + assertThat(charlieNetMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull() } @Test fun `nodes in distributed service`() { - val alice = startNodesWithPort(listOf(ALICE))[0] - val netMapCache = alice.services.networkMapCache + charlieNetMapCache.addNode(createNodeInfo(listOf(ALICE))) + + val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME) - val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity val distServiceNodeInfos = (1..2).map { - val nodeInfo = NodeInfo( - addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)), - legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity), - platformVersion = 3, - serial = 1 - ) - netMapCache.addNode(nodeInfo) + val nodeInfo = createNodeInfo(identities = listOf(TestIdentity.fresh("Org-$it"), distributedIdentity)) + charlieNetMapCache.addNode(nodeInfo) nodeInfo } - assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos) - assertThatExceptionOfType(IllegalArgumentException::class.java) - .isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) } + assertThat(charlieNetMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos) + assertThatIllegalArgumentException() + .isThrownBy { charlieNetMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) } .withMessageContaining(DUMMY_NOTARY_NAME.toString()) } @Test fun `get nodes by owning key and by name`() { - val alice = startNodesWithPort(listOf(ALICE))[0] - val netCache = alice.services.networkMapCache - val res = netCache.getNodeByLegalIdentity(alice.info.singleIdentity()) - assertEquals(alice.info, res) - val res2 = netCache.getNodeByLegalName(DUMMY_REGULATOR.name) - assertEquals(infos.singleOrNull { DUMMY_REGULATOR.name in it.legalIdentities.map { it.name } }, res2) + val alice = createNodeInfo(listOf(ALICE)) + charlieNetMapCache.addNode(alice) + assertThat(charlieNetMapCache.getNodesByLegalIdentityKey(ALICE.publicKey)).containsOnly(alice) + assertThat(charlieNetMapCache.getNodeByLegalName(ALICE.name)).isEqualTo(alice) } @Test fun `get nodes by address`() { - val alice = startNodesWithPort(listOf(ALICE))[0] - val netCache = alice.services.networkMapCache - val res = netCache.getNodeByAddress(alice.info.addresses[0]) - assertEquals(alice.info, res) + val alice = createNodeInfo(listOf(ALICE)) + charlieNetMapCache.addNode(alice) + assertThat(charlieNetMapCache.getNodeByAddress(alice.addresses[0])).isEqualTo(alice) } - // This test has to be done as normal node not mock, because MockNodes don't have addresses. @Test fun `insert two node infos with the same host and port`() { - val aliceNode = startNode(ALICE_NAME) - val charliePartyCert = getTestPartyAndCertificate(CHARLIE_NAME, generateKeyPair().public) - val aliceCache = aliceNode.services.networkMapCache - aliceCache.addNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(charliePartyCert))) - val res = aliceCache.allNodes.filter { aliceNode.info.addresses[0] in it.addresses } - assertEquals(2, res.size) + val alice = createNodeInfo(listOf(ALICE)) + charlieNetMapCache.addNode(alice) + val bob = createNodeInfo(listOf(BOB), address = alice.addresses[0]) + charlieNetMapCache.addNode(bob) + val nodeInfos = charlieNetMapCache.allNodes.filter { alice.addresses[0] in it.addresses } + assertThat(nodeInfos).hasSize(2) } - @Test - fun `restart node with DB map cache`() { - val alice = startNodesWithPort(listOf(ALICE))[0] - val partyNodes = alice.services.networkMapCache.allNodes - assertEquals(infos.size, partyNodes.size) - assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet()) + private fun createNodeInfo(identities: List, + address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo { + return NodeInfo( + addresses = listOf(address), + legalIdentitiesAndCerts = identities.map { it.identity }, + platformVersion = 3, + serial = 1 + ) } - // HELPERS - // Helper function to restart nodes with the same host and port. - private fun startNodesWithPort(nodesToStart: List, customRetryIntervalMs: Long? = null): List { - return nodesToStart.map { party -> - val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) + - (customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap()) - startNode(party.name, configOverrides = configOverrides) + private fun saveNodeInfoIntoDb(nodeInfo: NodeInfo) { + database.transaction { + session.save(NodeInfoSchemaV1.PersistentNodeInfo( + id = 0, + hash = nodeInfo.serialize().hash.toString(), + addresses = nodeInfo.addresses.map { NodeInfoSchemaV1.DBHostAndPort.fromHostAndPort(it) }, + legalIdentitiesAndCerts = nodeInfo.legalIdentitiesAndCerts.mapIndexed { idx, elem -> + NodeInfoSchemaV1.DBPartyAndCertificate(elem, isMain = idx == 0) + }, + platformVersion = nodeInfo.platformVersion, + serial = nodeInfo.serial + )) } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 5b6174759c..b7c1de642d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -153,7 +153,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // TODO Break cyclic dependency identityService.database = database } - private val persistentNetworkMapCache = PersistentNetworkMapCache(database) + private val persistentNetworkMapCache = PersistentNetworkMapCache(database, configuration.myLegalName) val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database).tokenize() val checkpointStorage = DBCheckpointStorage() @Suppress("LeakingThis") @@ -262,7 +262,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, open fun generateAndSaveNodeInfo(): NodeInfo { check(started == null) { "Node has already been started" } log.info("Generating nodeInfo ...") - persistentNetworkMapCache.start(notaries = emptyList()) val trustRoot = initKeyStore() val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) startDatabase() @@ -270,6 +269,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, identityService.start(trustRoot, listOf(identity.certificate, nodeCa)) return database.use { it.transaction { + persistentNetworkMapCache.start(notaries = emptyList()) val (_, nodeInfoAndSigned) = updateNodeInfo(identity, identityKeyPair, publish = false) nodeInfoAndSigned.nodeInfo } @@ -279,9 +279,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, fun clearNetworkMapCache() { Node.printBasicNodeInfo("Clearing network map cache entries") log.info("Starting clearing of network map cache entries...") - persistentNetworkMapCache.start(notaries = emptyList()) startDatabase() database.use { + persistentNetworkMapCache.start(notaries = emptyList()) persistentNetworkMapCache.clearNetworkMapCache() } } @@ -317,7 +317,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, "Node's platform version is lower than network's required minimumPlatformVersion" } servicesForResolution.start(netParams) - persistentNetworkMapCache.start(netParams.notaries) startDatabase() val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) @@ -339,6 +338,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } val (keyPairs, nodeInfoAndSigned, myNotaryIdentity) = database.transaction { + persistentNetworkMapCache.start(netParams.notaries) networkMapCache.start() updateNodeInfo(identity, identityKeyPair, publish = true) } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index a3e30beb7f..16e99e75b8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -11,6 +11,10 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter +import com.codahale.metrics.MetricFilter +import com.codahale.metrics.MetricRegistry +import com.palominolabs.metrics.newrelic.AllEnabledMetricAttributeFilter +import com.palominolabs.metrics.newrelic.NewRelicReporter import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture import net.corda.core.flows.FlowLogic @@ -52,6 +56,7 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.SecurityConfiguration import net.corda.node.services.config.shouldInitCrashShell import net.corda.node.services.config.shouldStartLocalShell +import net.corda.node.services.config.JmxReporterType import net.corda.node.services.messaging.* import net.corda.node.services.rpc.ArtemisRpcBroker import net.corda.node.utilities.AddressUtils @@ -74,11 +79,12 @@ import rx.schedulers.Schedulers import java.net.BindException import java.net.InetAddress import java.nio.file.Path +import java.nio.file.Paths import java.time.Clock import java.util.concurrent.atomic.AtomicInteger import javax.management.ObjectName import kotlin.system.exitProcess -import java.nio.file.Paths +import java.util.concurrent.TimeUnit class NodeWithInfo(val node: Node, val info: NodeInfo) { val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by node.services, FlowStarter by node.flowStarter {} @@ -411,18 +417,8 @@ open class Node(configuration: NodeConfiguration, val nodeInfo: NodeInfo = super.start() nodeReadyFuture.thenMatch({ serverThread.execute { - // Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia: - // - // https://jolokia.org/agent/jvm.html - JmxReporter.forRegistry(services.monitoringService.metrics).inDomain("net.corda").createsObjectNamesWith { _, domain, name -> - // Make the JMX hierarchy a bit better organised. - val category = name.substringBefore('.') - val subName = name.substringAfter('.', "") - if (subName == "") - ObjectName("$domain:name=$category") - else - ObjectName("$domain:type=$category,name=$subName") - }.build().start() + + registerJmxReporter(services.monitoringService.metrics) _startupComplete.set(Unit) } @@ -435,6 +431,47 @@ open class Node(configuration: NodeConfiguration, return nodeInfo } + /** + * A hook to allow configuration override of the JmxReporter being used. + */ + fun registerJmxReporter(metrics: MetricRegistry) { + log.info("Registering JMX reporter:") + when (configuration.jmxReporterType) { + JmxReporterType.JOLOKIA -> registerJolokiaReporter(metrics) + JmxReporterType.NEW_RELIC -> registerNewRelicReporter(metrics) + } + } + + private fun registerJolokiaReporter(registry: MetricRegistry) { + log.info("Registering Jolokia JMX reporter:") + // Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia: + // + // https://jolokia.org/agent/jvm.html + JmxReporter.forRegistry(registry).inDomain("net.corda").createsObjectNamesWith { _, domain, name -> + // Make the JMX hierarchy a bit better organised. + val category = name.substringBefore('.') + val subName = name.substringAfter('.', "") + if (subName == "") + ObjectName("$domain:name=$category") + else + ObjectName("$domain:type=$category,name=$subName") + }.build().start() + } + + private fun registerNewRelicReporter (registry: MetricRegistry) { + log.info("Registering New Relic JMX Reporter:") + val reporter = NewRelicReporter.forRegistry(registry) + .name("New Relic Reporter") + .filter(MetricFilter.ALL) + .attributeFilter(AllEnabledMetricAttributeFilter()) + .rateUnit(TimeUnit.SECONDS) + .durationUnit(TimeUnit.MILLISECONDS) + .metricNamePrefix("corda/") + .build() + + reporter.start(1, TimeUnit.MINUTES) + } + override val rxIoScheduler: Scheduler get() = Schedulers.io() private fun initialiseSerialization() { diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 44df33f1a6..41648e3488 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -86,6 +86,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val flowMonitorPeriodMillis: Duration get() = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS val cordappDirectories: List get() = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT) + val jmxReporterType : JmxReporterType? get() = defaultJmxReporterType fun validate(): List @@ -102,9 +103,18 @@ interface NodeConfiguration : NodeSSLConfiguration { const val defaultAttachmentCacheBound = 1024L const val cordappDirectoriesKey = "cordappDirectories" + + val defaultJmxReporterType = JmxReporterType.JOLOKIA } } +/** + * Currently registered JMX Reporters + */ +enum class JmxReporterType { + JOLOKIA, NEW_RELIC +} + data class DevModeOptions(val disableCheckpointChecker: Boolean = false, val allowCompatibilityZone: Boolean = false) data class GraphiteOptions( @@ -268,7 +278,8 @@ data class NodeConfigurationImpl( private val jarDirs: List = emptyList(), override val flowMonitorPeriodMillis: Duration = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS, override val flowMonitorSuspensionLoggingThresholdMillis: Duration = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS, - override val cordappDirectories: List = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT) + override val cordappDirectories: List = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT), + override val jmxReporterType: JmxReporterType? = JmxReporterType.JOLOKIA ) : NodeConfiguration { companion object { private val logger = loggerFor() diff --git a/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt b/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt index 807bd068b7..a3cf330731 100644 --- a/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt +++ b/node/src/main/kotlin/net/corda/node/services/logging/ContextualLoggingUtils.kt @@ -10,29 +10,43 @@ package net.corda.node.services.logging +import net.corda.core.context.Actor import net.corda.core.context.InvocationContext +import net.corda.core.context.InvocationOrigin +import net.corda.core.context.Trace import org.slf4j.MDC internal fun InvocationContext.pushToLoggingContext() { - MDC.put("invocation_id", trace.invocationId.value) - MDC.put("invocation_timestamp", trace.invocationId.timestamp.toString()) - MDC.put("session_id", trace.sessionId.value) - MDC.put("session_timestamp", trace.sessionId.timestamp.toString()) - actor?.let { - MDC.put("actor_id", it.id.value) - MDC.put("actor_store_id", it.serviceId.value) - MDC.put("actor_owningIdentity", it.owningLegalIdentity.toString()) + trace.pushToLoggingContext() + actor?.pushToLoggingContext() + origin.pushToLoggingContext() + externalTrace?.pushToLoggingContext("external_") + impersonatedActor?.pushToLoggingContext("impersonating_") +} + +internal fun Trace.pushToLoggingContext(prefix: String = "") { + + MDC.getMDCAdapter().apply { + put("${prefix}invocation_id", invocationId.value) + put("${prefix}invocation_timestamp", invocationId.timestamp.toString()) + put("${prefix}session_id", sessionId.value) + put("${prefix}session_timestamp", sessionId.timestamp.toString()) } - externalTrace?.let { - MDC.put("external_invocation_id", it.invocationId.value) - MDC.put("external_invocation_timestamp", it.invocationId.timestamp.toString()) - MDC.put("external_session_id", it.sessionId.value) - MDC.put("external_session_timestamp", it.sessionId.timestamp.toString()) +} + +internal fun Actor.pushToLoggingContext(prefix: String = "") { + + MDC.getMDCAdapter().apply { + put("${prefix}actor_id", id.value) + put("${prefix}actor_store_id", serviceId.value) + put("${prefix}actor_owning_identity", owningLegalIdentity.toString()) } - impersonatedActor?.let { - MDC.put("impersonating_actor_id", it.id.value) - MDC.put("impersonating_actor_store_id", it.serviceId.value) - MDC.put("impersonating_actor_owningIdentity", it.owningLegalIdentity.toString()) +} + +internal fun InvocationOrigin.pushToLoggingContext(prefix: String = "") { + + MDC.getMDCAdapter().apply { + put("${prefix}origin", principal().name) } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapCacheImpl.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapCacheImpl.kt new file mode 100644 index 0000000000..c31f089a07 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapCacheImpl.kt @@ -0,0 +1,51 @@ +package net.corda.node.services.network + +import net.corda.core.identity.AbstractParty +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.IdentityService +import net.corda.core.node.services.NetworkMapCache +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.contextLogger +import net.corda.node.services.api.NetworkMapCacheBaseInternal +import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.nodeapi.internal.persistence.CordaPersistence + +class NetworkMapCacheImpl( + private val networkMapCacheBase: NetworkMapCacheBaseInternal, + private val identityService: IdentityService, + private val database: CordaPersistence +) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() { + companion object { + private val logger = contextLogger() + } + + fun start() { + for (nodeInfo in networkMapCacheBase.allNodes) { + for (identity in nodeInfo.legalIdentitiesAndCerts) { + identityService.verifyAndRegisterIdentity(identity) + } + } + networkMapCacheBase.changed.subscribe { mapChange -> + // TODO how should we handle network map removal + if (mapChange is NetworkMapCache.MapChange.Added) { + mapChange.node.legalIdentitiesAndCerts.forEach { + try { + identityService.verifyAndRegisterIdentity(it) + } catch (ignore: Exception) { + // Log a warning to indicate node info is not added to the network map cache. + logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.") + } + } + } + } + } + + override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? { + return database.transaction { + val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party) + wellKnownParty?.let { + getNodesByLegalIdentityKey(it.owningKey).firstOrNull() + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index b8b807da47..1425ca74d0 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -13,7 +13,6 @@ package net.corda.node.services.network import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.SecureHash import net.corda.core.crypto.toStringShort -import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate @@ -22,7 +21,6 @@ import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed import net.corda.core.node.NodeInfo import net.corda.core.node.NotaryInfo -import net.corda.core.node.services.IdentityService import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SingletonSerializeAsToken @@ -32,7 +30,6 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.node.internal.schemas.NodeInfoSchemaV1 import net.corda.node.services.api.NetworkMapCacheBaseInternal -import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.utilities.NonInvalidatingCache import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit @@ -44,45 +41,10 @@ import java.security.PublicKey import java.util.* import javax.annotation.concurrent.ThreadSafe -class NetworkMapCacheImpl( - private val networkMapCacheBase: NetworkMapCacheBaseInternal, - private val identityService: IdentityService, - private val database: CordaPersistence -) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() { - companion object { - private val logger = contextLogger() - } - - fun start() { - networkMapCacheBase.allNodes.forEach { it.legalIdentitiesAndCerts.forEach { identityService.verifyAndRegisterIdentity(it) } } - networkMapCacheBase.changed.subscribe { mapChange -> - // TODO how should we handle network map removal - if (mapChange is MapChange.Added) { - mapChange.node.legalIdentitiesAndCerts.forEach { - try { - identityService.verifyAndRegisterIdentity(it) - } catch (ignore: Exception) { - // Log a warning to indicate node info is not added to the network map cache. - logger.warn("Node info for :'${it.name}' is not added to the network map due to verification error.") - } - } - } - } - } - - override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? { - return database.transaction { - val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party) - wellKnownParty?.let { - getNodesByLegalIdentityKey(it.owningKey).firstOrNull() - } - } - } -} - /** Database-based network map cache. */ @ThreadSafe -open class PersistentNetworkMapCache(private val database: CordaPersistence) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal { +open class PersistentNetworkMapCache(private val database: CordaPersistence, + private val myLegalName: CordaX500Name) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal { companion object { private val logger = contextLogger() } @@ -115,6 +77,15 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence) : S fun start(notaries: List) { this.notaries = notaries + val otherNodeInfoCount = database.transaction { + session.createQuery( + "select count(*) from ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n join n.legalIdentitiesAndCerts i where i.name != :myLegalName") + .setParameter("myLegalName", myLegalName.toString()) + .singleResult as Long + } + if (otherNodeInfoCount > 0) { + _nodeReady.set(null) + } } override fun getNodeByHash(nodeHash: SecureHash): NodeInfo? { @@ -212,7 +183,9 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence) : S logger.info("Previous node was identical to incoming one - doing nothing") } } - _nodeReady.set(null) + if (node.legalIdentities[0].name != myLegalName) { + _nodeReady.set(null) + } logger.debug { "Done adding node with info: $node" } } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index a11783b91f..ea933ce79e 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -49,3 +49,5 @@ flowTimeout { maxRestartCount = 6 backoffBase = 1.8 } +jmxReporterType = JOLOKIA + diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeRestartTests.kt b/node/src/test/kotlin/net/corda/node/internal/NodeRestartTests.kt new file mode 100644 index 0000000000..51f1e69bc7 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/NodeRestartTests.kt @@ -0,0 +1,52 @@ +package net.corda.node.internal + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.identity.Party +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.startFlow +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Test + +class NodeRestartTests { + private val mockNet = InternalMockNetwork(threadPerNode = true, autoVisibleNodes = false, notarySpecs = emptyList()) + + @After + fun cleanUp() { + mockNet.close() + } + + @Test + fun `restart with no network map cache update`() { + val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) + val bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) + bob.registerInitiatedFlow(Responder::class.java) + alice.services.networkMapCache.addNode(bob.info) + bob.services.networkMapCache.addNode(alice.info) + val alice2 = mockNet.restartNode(alice) + val result = alice2.services.startFlow(Initiator(bob.info.singleIdentity())).resultFuture.getOrThrow() + assertThat(result).isEqualTo(123) + } + + @InitiatingFlow + private class Initiator(private val otherSide: Party) : FlowLogic() { + @Suspendable + override fun call(): Int = initiateFlow(otherSide).receive().unwrap { it } + } + + @InitiatedBy(Initiator::class) + private class Responder(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() = otherSide.send(123) + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 2d2d0252ed..c8a5c39331 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -255,6 +255,29 @@ class NodeConfigurationImplTest { assertEquals(compatibilityZoneURL, configuration.networkServices!!.networkMapURL) } + @Test + fun `jmxReporterType is null and defaults to Jokolia`() { + var rawConfig = getConfig("working-config.conf", ConfigFactory.parseMap(mapOf("devMode" to true))) + val nodeConfig = rawConfig.parseAsNodeConfiguration() + assertTrue(JmxReporterType.JOLOKIA.toString() == nodeConfig.jmxReporterType.toString()) + } + + @Test + fun `jmxReporterType is not null and is set to New Relic`() { + var rawConfig = getConfig("working-config.conf", ConfigFactory.parseMap(mapOf("devMode" to true))) + rawConfig = rawConfig.withValue("jmxReporterType", ConfigValueFactory.fromAnyRef("NEW_RELIC")) + val nodeConfig = rawConfig.parseAsNodeConfiguration() + assertTrue(JmxReporterType.NEW_RELIC.toString() == nodeConfig.jmxReporterType.toString()) + } + + @Test + fun `jmxReporterType is not null and set to Jokolia`() { + var rawConfig = getConfig("working-config.conf", ConfigFactory.parseMap(mapOf("devMode" to true))) + rawConfig = rawConfig.withValue("jmxReporterType", ConfigValueFactory.fromAnyRef("JOLOKIA")) + val nodeConfig = rawConfig.parseAsNodeConfiguration() + assertTrue(JmxReporterType.JOLOKIA.toString() == nodeConfig.jmxReporterType.toString()) + } + private fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?): NodeConfiguration { return testConfiguration.copy(devMode = devMode, devModeOptions = devModeOptions) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 6b910619eb..32a6e91abd 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -92,6 +92,7 @@ data class MockNodeArgs( val version: VersionInfo = MOCK_VERSION_INFO ) +// TODO We don't need a parameters object as this is internal only data class InternalMockNodeParameters( val forcedID: Int? = null, val legalName: CordaX500Name? = null, @@ -157,7 +158,8 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe val testDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()), val networkParameters: NetworkParameters = testNetworkParameters(), val defaultFactory: (MockNodeArgs, CordappLoader?) -> MockNode = { args, cordappLoader -> cordappLoader?.let { MockNode(args, it) } ?: MockNode(args) }, - val cordappsForAllNodes: Set = emptySet()) : AutoCloseable { + val cordappsForAllNodes: Set = emptySet(), + val autoVisibleNodes: Boolean = true) : AutoCloseable { init { // Apache SSHD for whatever reason registers a SFTP FileSystemProvider - which gets loaded by JimFS. // This SFTP support loads BouncyCastle, which we want to avoid. @@ -369,6 +371,7 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe } private fun advertiseNodeToNetwork(newNode: TestStartedNode) { + if (!mockNet.autoVisibleNodes) return mockNet.nodes .mapNotNull { it.started } .forEach { existingNode ->