Merge pull request #6565 from corda/christians/ENT-5273-updateandfixes

ENT-5273 update and fixes
This commit is contained in:
Christian Sailer
2020-08-04 17:48:59 +01:00
committed by GitHub
9 changed files with 76 additions and 115 deletions

View File

@ -172,16 +172,30 @@ buildscript {
} }
} }
} else { } else {
maven {
url "${artifactory_contextUrl}/corda-dependencies-dev"
content {
includeGroupByRegex 'net\\.corda(\\..*)?'
includeGroupByRegex 'com\\.r3(\\..*)?'
}
mavenContent {
snapshotsOnly()
}
}
maven {
url "${artifactory_contextUrl}/corda-releases"
content {
includeGroupByRegex 'net\\.corda(\\..*)?'
includeGroupByRegex 'com\\.r3(\\..*)?'
}
}
mavenCentral() mavenCentral()
jcenter() jcenter()
maven { maven {
url 'https://kotlin.bintray.com/kotlinx' url 'https://kotlin.bintray.com/kotlinx'
} content {
maven { includeGroup 'org.jetbrains.kotlin'
url "${artifactory_contextUrl}/corda-dependencies-dev" }
}
maven {
url "${artifactory_contextUrl}/corda-releases"
} }
} }
} }
@ -205,7 +219,6 @@ buildscript {
// See https://github.com/corda/gradle-capsule-plugin // See https://github.com/corda/gradle-capsule-plugin
classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3" classpath "us.kirchmeier:gradle-capsule-plugin:1.0.4_r3"
classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8S-SHARED-CACHE-SNAPSHOT", changing: true classpath group: "com.r3.testing", name: "gradle-distributed-testing-plugin", version: "1.2-LOCAL-K8S-SHARED-CACHE-SNAPSHOT", changing: true
classpath group: "com.r3.dependx", name: "gradle-dependx", version: "0.1.13", changing: true
classpath "com.bmuschko:gradle-docker-plugin:5.0.0" classpath "com.bmuschko:gradle-docker-plugin:5.0.0"
classpath "org.sonarsource.scanner.gradle:sonarqube-gradle-plugin:2.8" classpath "org.sonarsource.scanner.gradle:sonarqube-gradle-plugin:2.8"
} }
@ -223,7 +236,6 @@ apply plugin: 'com.github.ben-manes.versions'
apply plugin: 'net.corda.plugins.publish-utils' apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'com.jfrog.artifactory' apply plugin: 'com.jfrog.artifactory'
apply plugin: "com.bmuschko.docker-remote-api" apply plugin: "com.bmuschko.docker-remote-api"
apply plugin: "com.r3.dependx.dependxies"
// If the command line project option -PversionFromGit is added to the gradle invocation, we'll resolve // If the command line project option -PversionFromGit is added to the gradle invocation, we'll resolve
@ -390,11 +402,32 @@ allprojects {
} }
} }
} else { } else {
maven {
url "${artifactory_contextUrl}/corda-dependencies"
content {
includeGroupByRegex 'net\\.corda(\\..*)?'
includeGroupByRegex 'com\\.r3(\\..*)?'
includeGroup 'co.paralleluniverse'
includeGroup 'org.crashub'
includeGroup 'com.github.bft-smart'
}
}
maven {
url "${artifactory_contextUrl}/corda-dev"
content {
includeGroupByRegex 'net\\.corda(\\..*)?'
includeGroupByRegex 'com\\.r3(\\..*)?'
}
}
maven {
url 'https://repo.gradle.org/gradle/libs-releases'
content {
includeGroup 'org.gradle'
includeGroup 'com.github.detro'
}
}
mavenCentral() mavenCentral()
jcenter() jcenter()
maven { url "${artifactory_contextUrl}/corda-dependencies" }
maven { url 'https://repo.gradle.org/gradle/libs-releases' }
maven { url "${artifactory_contextUrl}/corda-dev" }
} }
} }
@ -655,11 +688,6 @@ artifactory {
} }
} }
dependxiesModule {
mode = "monitor"
skipTasks = "test,integrationTest,smokeTest,slowIntegrationTest"
}
tasks.register('generateApi', net.corda.plugins.apiscanner.GenerateApi) { tasks.register('generateApi', net.corda.plugins.apiscanner.GenerateApi) {
baseName = "api-corda" baseName = "api-corda"
} }
@ -722,7 +750,6 @@ ext.largeScaleSet = [
] ]
task allParallelIntegrationTest(type: ParallelTestGroup) { task allParallelIntegrationTest(type: ParallelTestGroup) {
dependsOn dependxiesModule
podLogLevel PodLogLevel.INFO podLogLevel PodLogLevel.INFO
testGroups "integrationTest" testGroups "integrationTest"
numberOfShards generalPurpose.numberOfShards numberOfShards generalPurpose.numberOfShards
@ -733,7 +760,6 @@ task allParallelIntegrationTest(type: ParallelTestGroup) {
distribute DistributeTestsBy.METHOD distribute DistributeTestsBy.METHOD
} }
task allParallelUnitTest(type: ParallelTestGroup) { task allParallelUnitTest(type: ParallelTestGroup) {
dependsOn dependxiesModule
podLogLevel PodLogLevel.INFO podLogLevel PodLogLevel.INFO
testGroups "test" testGroups "test"
numberOfShards generalPurpose.numberOfShards numberOfShards generalPurpose.numberOfShards
@ -744,7 +770,6 @@ task allParallelUnitTest(type: ParallelTestGroup) {
distribute DistributeTestsBy.CLASS distribute DistributeTestsBy.CLASS
} }
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) { task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
dependsOn dependxiesModule
testGroups "test", "integrationTest" testGroups "test", "integrationTest"
numberOfShards generalPurpose.numberOfShards numberOfShards generalPurpose.numberOfShards
streamOutput generalPurpose.streamOutput streamOutput generalPurpose.streamOutput
@ -755,7 +780,6 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
} }
task parallelRegressionTest(type: ParallelTestGroup) { task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "smokeTest" testGroups "test", "integrationTest", "smokeTest"
dependsOn dependxiesModule
numberOfShards generalPurpose.numberOfShards numberOfShards generalPurpose.numberOfShards
streamOutput generalPurpose.streamOutput streamOutput generalPurpose.streamOutput
coresPerFork generalPurpose.coresPerFork coresPerFork generalPurpose.coresPerFork
@ -765,7 +789,6 @@ task parallelRegressionTest(type: ParallelTestGroup) {
} }
task allParallelSmokeTest(type: ParallelTestGroup) { task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "smokeTest" testGroups "smokeTest"
dependsOn dependxiesModule
numberOfShards generalPurpose.numberOfShards numberOfShards generalPurpose.numberOfShards
streamOutput generalPurpose.streamOutput streamOutput generalPurpose.streamOutput
coresPerFork generalPurpose.coresPerFork coresPerFork generalPurpose.coresPerFork
@ -775,7 +798,6 @@ task allParallelSmokeTest(type: ParallelTestGroup) {
} }
task allParallelSlowIntegrationTest(type: ParallelTestGroup) { task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest" testGroups "slowIntegrationTest"
dependsOn dependxiesModule
numberOfShards generalPurpose.numberOfShards numberOfShards generalPurpose.numberOfShards
streamOutput generalPurpose.streamOutput streamOutput generalPurpose.streamOutput
coresPerFork generalPurpose.coresPerFork coresPerFork generalPurpose.coresPerFork

View File

@ -57,7 +57,7 @@ enum class TransactionIsolationLevel {
val jdbcValue: Int = java.sql.Connection::class.java.getField(jdbcString).get(null) as Int val jdbcValue: Int = java.sql.Connection::class.java.getField(jdbcString).get(null) as Int
companion object{ companion object{
val default = REPEATABLE_READ val default = READ_COMMITTED
} }
} }

View File

@ -1,32 +0,0 @@
package net.corda.node.endurance
import net.corda.core.utilities.getOrThrow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@RunWith(Parameterized::class)
class NodesStartStopSingleVmTests(@Suppress("unused") private val iteration: Int) {
companion object {
@JvmStatic
@Parameterized.Parameters(name = "iteration = {0}")
fun iterations(): Iterable<Array<Int>> {
return (1..60).map { arrayOf(it) }
}
}
@Test(timeout = 300_000)
fun nodesStartStop() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME)
val bob = startNode(providedName = BOB_NAME)
alice.getOrThrow()
bob.getOrThrow()
}
}
}

View File

@ -47,7 +47,7 @@ class DistributedServiceTests {
invokeRpc(CordaRPCOps::stateMachinesFeed)) invokeRpc(CordaRPCOps::stateMachinesFeed))
) )
driver(DriverParameters( driver(DriverParameters(
cordappsForAllNodes = FINANCE_CORDAPPS + cordappWithPackages("net.corda.notary.raft"), cordappsForAllNodes = FINANCE_CORDAPPS + cordappWithPackages(),
notarySpecs = listOf(NotarySpec( notarySpecs = listOf(NotarySpec(
DUMMY_NOTARY_NAME, DUMMY_NOTARY_NAME,
rpcUsers = listOf(testUser), rpcUsers = listOf(testUser),

View File

@ -42,7 +42,6 @@ class P2PMessagingTest {
private fun startDriverWithDistributedService(dsl: DriverDSL.(List<InProcess>) -> Unit) { private fun startDriverWithDistributedService(dsl: DriverDSL.(List<InProcess>) -> Unit) {
driver(DriverParameters( driver(DriverParameters(
startNodesInProcess = true, startNodesInProcess = true,
extraCordappPackagesToScan = listOf("net.corda.notary.raft"),
notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2))) notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2)))
)) { )) {
dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as InProcess) }) dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as InProcess) })

View File

@ -56,12 +56,6 @@ open class SharedNodeCmdLineOptions {
) )
var allowHibernateToManangeAppSchema: Boolean = false var allowHibernateToManangeAppSchema: Boolean = false
@Option(
names = ["--pause-all-flows"],
description = ["Do not run any flows on startup. Sets all flows to paused, which can be unpaused via RPC."]
)
var safeMode: Boolean = false
open fun parseConfiguration(configuration: Config): Valid<NodeConfiguration> { open fun parseConfiguration(configuration: Config): Valid<NodeConfiguration> {
val option = Configuration.Options(strict = unknownConfigKeysPolicy == UnknownConfigKeysPolicy.FAIL) val option = Configuration.Options(strict = unknownConfigKeysPolicy == UnknownConfigKeysPolicy.FAIL)
return configuration.parseAsNodeConfiguration(option) return configuration.parseAsNodeConfiguration(option)

View File

@ -95,7 +95,9 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
} }
} }
private companion object { internal companion object {
const val TRANSACTION_ALREADY_IN_PROGRESS_WARNING = "trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions."
// Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here, // Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here,
// as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add // as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add
// to the memory pressure at all here. // to the memory pressure at all here.
@ -111,7 +113,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
} }
} }
fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock) private fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock)
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> { : AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>( return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
cacheFactory = cacheFactory, cacheFactory = cacheFactory,
@ -221,12 +223,22 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
} }
override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> { override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> {
val (transaction, warning) = trackTransactionInternal(id)
warning?.also { log.warn(it) }
return transaction
}
if (contextTransactionOrNull != null) { /**
log.warn("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") * @return a pair of the signed transaction, and a string containing any warning.
*/
internal fun trackTransactionInternal(id: SecureHash): Pair<CordaFuture<SignedTransaction>, String?> {
val warning: String? = if (contextTransactionOrNull != null) {
TRANSACTION_ALREADY_IN_PROGRESS_WARNING
} else {
null
} }
return trackTransactionWithNoWarning(id) return Pair(trackTransactionWithNoWarning(id), warning)
} }
override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction> { override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction> {

View File

@ -280,7 +280,7 @@ class NodeConfigurationImplTest {
@Test(timeout=3_000) @Test(timeout=3_000)
fun `compatibilityZoneURL populates NetworkServices`() { fun `compatibilityZoneURL populates NetworkServices`() {
val compatibilityZoneURL = URI.create("https://r3.com").toURL() val compatibilityZoneURL = URI.create("https://r3.example.com").toURL()
val configuration = testConfiguration.copy( val configuration = testConfiguration.copy(
devMode = false, devMode = false,
compatibilityZoneURL = compatibilityZoneURL) compatibilityZoneURL = compatibilityZoneURL)

View File

@ -9,22 +9,22 @@ import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.TransactionSignature
import net.corda.core.toFuture import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.CordaClock import net.corda.node.CordaClock
import net.corda.node.MutableClock import net.corda.node.MutableClock
import net.corda.node.SimpleClock import net.corda.node.SimpleClock
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.* import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.dummyCommand
import net.corda.testing.internal.LogHelper import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.configureDatabase
import net.corda.testing.internal.createWireTransaction import net.corda.testing.internal.createWireTransaction
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.Appender
import org.apache.logging.log4j.core.LoggerContext
import org.apache.logging.log4j.core.appender.WriterAppender
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Assert import org.junit.Assert
@ -32,10 +32,9 @@ import org.junit.Before
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import rx.plugins.RxJavaHooks import rx.plugins.RxJavaHooks
import java.io.StringWriter
import java.util.concurrent.Semaphore
import java.time.Clock import java.time.Clock
import java.time.Instant import java.time.Instant
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -381,47 +380,14 @@ class DBTransactionStorageTests {
val signedTransaction = newTransaction() val signedTransaction = newTransaction()
// Act // Act
val logMessages = collectLogsFrom { val warning = database.transaction {
database.transaction { val (result, warning) = transactionStorage.trackTransactionInternal(signedTransaction.id)
val result = transactionStorage.trackTransaction(signedTransaction.id) result.cancel(false)
result.cancel(false) warning
}
} }
// Assert // Assert
assertThat(logMessages).contains("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") assertThat(warning).isEqualTo(DBTransactionStorage.TRANSACTION_ALREADY_IN_PROGRESS_WARNING)
}
private fun collectLogsFrom(statement: () -> Unit): String {
// Create test appender
val stringWriter = StringWriter()
val appenderName = this::collectLogsFrom.name
val appender: Appender = WriterAppender.createAppender(
null,
null,
stringWriter,
appenderName,
false,
true
)
appender.start()
// Add test appender
val context = LogManager.getContext(false) as LoggerContext
val configuration = context.configuration
configuration.addAppender(appender)
configuration.loggers.values.forEach { it.addAppender(appender, null, null) }
try {
statement()
} finally {
// Remove test appender
configuration.loggers.values.forEach { it.removeAppender(appenderName) }
configuration.appenders.remove(appenderName)
appender.stop()
}
return stringWriter.toString()
} }
private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) { private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) {