CORDA-924 Fix IntegrationTestingTutorial (#2349)

i.e. H2 startup bug. also:
* Fix poor connection pool discipline
* Log cleanup failures instead of replacing foreground failure
This commit is contained in:
Andrzej Cichocki 2018-01-17 14:23:13 +00:00 committed by GitHub
parent a19b213d7d
commit ea57639a37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 183 additions and 30 deletions

View File

@ -1,6 +1,7 @@
package net.corda.nodeapi.internal.persistence package net.corda.nodeapi.internal.persistence
import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
import rx.Observable import rx.Observable
import rx.Subscriber import rx.Subscriber
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
@ -44,6 +45,10 @@ class CordaPersistence(
schemas: Set<MappedSchema>, schemas: Set<MappedSchema>,
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet() attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
) : Closeable { ) : Closeable {
companion object {
private val log = contextLogger()
}
val defaultIsolationLevel = databaseConfig.transactionIsolationLevel val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
val hibernateConfig: HibernateConfiguration by lazy { val hibernateConfig: HibernateConfiguration by lazy {
transaction { transaction {
@ -56,9 +61,7 @@ class CordaPersistence(
DatabaseTransactionManager(this) DatabaseTransactionManager(this)
// Check not in read-only mode. // Check not in read-only mode.
transaction { transaction {
dataSource.connection.use { check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
check(!it.metaData.isReadOnly) { "Database should not be readonly." }
}
} }
} }
@ -90,7 +93,7 @@ class CordaPersistence(
*/ */
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T { fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T {
DatabaseTransactionManager.dataSource = this DatabaseTransactionManager.dataSource = this
return transaction(isolationLevel, 3, statement) return transaction(isolationLevel, 2, statement)
} }
/** /**
@ -99,17 +102,22 @@ class CordaPersistence(
*/ */
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement) fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
private fun <T> transaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T { private fun <T> transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
val outer = DatabaseTransactionManager.currentOrNull() val outer = DatabaseTransactionManager.currentOrNull()
return if (outer != null) { return if (outer != null) {
outer.statement() outer.statement()
} else { } else {
inTopLevelTransaction(isolationLevel, repetitionAttempts, statement) inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, statement)
} }
} }
private fun <T> inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T { private fun <T> inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
var repetitions = 0 var recoverableFailureCount = 0
fun <T> quietly(task: () -> T) = try {
task()
} catch (t: Throwable) {
log.warn("Cleanup task failed:", t)
}
while (true) { while (true) {
val transaction = DatabaseTransactionManager.currentOrNew(isolationLevel) val transaction = DatabaseTransactionManager.currentOrNew(isolationLevel)
try { try {
@ -117,16 +125,14 @@ class CordaPersistence(
transaction.commit() transaction.commit()
return answer return answer
} catch (e: SQLException) { } catch (e: SQLException) {
transaction.rollback() quietly(transaction::rollback)
repetitions++ if (++recoverableFailureCount > recoverableFailureTolerance) throw e
if (repetitions >= repetitionAttempts) { log.warn("Caught failure, will retry:", e)
throw e
}
} catch (e: Throwable) { } catch (e: Throwable) {
transaction.rollback() quietly(transaction::rollback)
throw e throw e
} finally { } finally {
transaction.close() quietly(transaction::close)
} }
} }
} }

View File

@ -9,6 +9,7 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashIssueFlow
import net.corda.node.internal.DataSourceFactory
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions import net.corda.node.services.Permissions
@ -22,8 +23,9 @@ import org.junit.Before
import org.junit.Test import org.junit.Test
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import java.sql.DriverManager
import java.sql.Statement import java.sql.Statement
import java.util.*
import javax.sql.DataSource
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
/* /*
@ -268,8 +270,9 @@ private class UsersDB : AutoCloseable {
} }
} }
private val dataSource: DataSource
inline private fun session(statement: (Statement) -> Unit) { inline private fun session(statement: (Statement) -> Unit) {
DriverManager.getConnection(jdbcUrl).use { dataSource.connection.use {
it.autoCommit = false it.autoCommit = false
it.createStatement().use(statement) it.createStatement().use(statement)
it.commit() it.commit()
@ -281,7 +284,10 @@ private class UsersDB : AutoCloseable {
roleAndPermissions: List<RoleAndPermissions> = emptyList()) { roleAndPermissions: List<RoleAndPermissions> = emptyList()) {
jdbcUrl = "jdbc:h2:mem:${name};DB_CLOSE_DELAY=-1" jdbcUrl = "jdbc:h2:mem:${name};DB_CLOSE_DELAY=-1"
dataSource = DataSourceFactory.createDataSource(Properties().apply {
put("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
put("dataSource.url", jdbcUrl)
}, false)
session { session {
it.execute(DB_CREATE_SCHEMA) it.execute(DB_CREATE_SCHEMA)
} }
@ -295,7 +301,7 @@ private class UsersDB : AutoCloseable {
} }
override fun close() { override fun close() {
DriverManager.getConnection(jdbcUrl).use { dataSource.connection.use {
it.createStatement().use { it.createStatement().use {
it.execute("DROP ALL OBJECTS") it.execute("DROP ALL OBJECTS")
} }

View File

@ -3,8 +3,6 @@ package net.corda.node.internal
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.google.common.collect.MutableClassToInstanceMap import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.MoreExecutors
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.confidential.SwapIdentitiesFlow import net.corda.confidential.SwapIdentitiesFlow
import net.corda.confidential.SwapIdentitiesHandler import net.corda.confidential.SwapIdentitiesHandler
import net.corda.core.CordaException import net.corda.core.CordaException
@ -612,9 +610,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
if (props.isNotEmpty()) { if (props.isNotEmpty()) {
val database = configureDatabase(props, configuration.database, identityService, schemaService) val database = configureDatabase(props, configuration.database, identityService, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly. // Now log the vendor string as this will also cause a connection to be tested eagerly.
database.transaction { logVendorString(database, log)
log.info("Connected to ${database.dataSource.connection.metaData.databaseProductName} database.")
}
runOnStop += database::close runOnStop += database::close
return database.transaction { return database.transaction {
insideTransaction(database) insideTransaction(database)
@ -818,6 +814,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
} }
@VisibleForTesting
internal fun logVendorString(database: CordaPersistence, log: Logger) {
database.transaction {
log.info("Connected to ${connection.metaData.databaseProductName} database.")
}
}
internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter { internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter {
override fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<FlowStateMachine<T>> { override fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<FlowStateMachine<T>> {
return serverThread.fetchFrom { smm.startFlow(logic, context) } return serverThread.fetchFrom { smm.startFlow(logic, context) }
@ -840,7 +843,7 @@ class ConfigurationException(message: String) : CordaException(message)
*/ */
internal class NetworkMapCacheEmptyException : Exception() internal class NetworkMapCacheEmptyException : Exception()
fun configureDatabase(dataSourceProperties: Properties, fun configureDatabase(hikariProperties: Properties,
databaseConfig: DatabaseConfig, databaseConfig: DatabaseConfig,
identityService: IdentityService, identityService: IdentityService,
schemaService: SchemaService = NodeSchemaService()): CordaPersistence { schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
@ -849,8 +852,7 @@ fun configureDatabase(dataSourceProperties: Properties,
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if // so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else. // either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService)) JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService))
val config = HikariConfig(dataSourceProperties) val dataSource = DataSourceFactory.createDataSource(hikariProperties)
val dataSource = HikariDataSource(config)
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService)) val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService))
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters) return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters)
} }

View File

@ -0,0 +1,46 @@
package net.corda.node.internal
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import com.zaxxer.hikari.util.PropertyElf
import net.corda.core.internal.declaredField
import org.h2.engine.Database
import org.h2.engine.Engine
import org.slf4j.LoggerFactory
import java.lang.reflect.Modifier
import java.util.*
import javax.sql.DataSource
object DataSourceFactory {
/** H2 only uses get/put/remove on [Engine.DATABASES], and it has to be a [HashMap]. */
private class SynchronizedGetPutRemove<K, V> : HashMap<K, V>() {
@Synchronized
override fun get(key: K) = super.get(key)
@Synchronized
override fun put(key: K, value: V) = super.put(key, value)
@Synchronized
override fun remove(key: K) = super.remove(key)
}
init {
LoggerFactory.getLogger(javaClass).debug("Applying H2 fix.") // See CORDA-924.
Engine::class.java.getDeclaredField("DATABASES").apply {
isAccessible = true
declaredField<Int>("modifiers").apply { value = value and Modifier.FINAL.inv() }
}.set(null, SynchronizedGetPutRemove<String, Database>())
}
fun createDataSource(hikariProperties: Properties, pool: Boolean = true): DataSource {
val config = HikariConfig(hikariProperties)
return if (pool) {
HikariDataSource(config)
} else {
// Basic init for the one test that wants to go via this API but without starting a HikariPool:
(Class.forName(hikariProperties.getProperty("dataSourceClassName")).newInstance() as DataSource).also {
PropertyElf.setTargetFromProperties(it, config.dataSourceProperties)
}
}
}
}

View File

@ -3,10 +3,9 @@ package net.corda.node.internal.security
import com.google.common.cache.CacheBuilder import com.google.common.cache.CacheBuilder
import com.google.common.cache.Cache import com.google.common.cache.Cache
import com.google.common.primitives.Ints import com.google.common.primitives.Ints
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.core.context.AuthServiceId import net.corda.core.context.AuthServiceId
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.internal.DataSourceFactory
import net.corda.node.services.config.PasswordEncryption import net.corda.node.services.config.PasswordEncryption
import net.corda.node.services.config.SecurityConfiguration import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.AuthDataSourceType import net.corda.node.services.config.AuthDataSourceType
@ -246,7 +245,7 @@ private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource
init { init {
credentialsMatcher = buildCredentialMatcher(config.passwordEncryption) credentialsMatcher = buildCredentialMatcher(config.passwordEncryption)
setPermissionsLookupEnabled(true) setPermissionsLookupEnabled(true)
dataSource = HikariDataSource(HikariConfig(config.connection!!)) dataSource = DataSourceFactory.createDataSource(config.connection!!)
permissionResolver = RPCPermissionResolver permissionResolver = RPCPermissionResolver
} }

View File

@ -0,0 +1,91 @@
package net.corda.node.internal
import com.nhaarman.mockito_kotlin.mock
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.common.internal.relaxedThoroughness
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.internal.ProcessUtilities.startJavaProcess
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.slf4j.Logger
import java.io.File
import java.util.*
import java.util.concurrent.Executors
import kotlin.test.assertEquals
class AbstractNodeTests {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private var nextNodeIndex = 0
private fun freshURL(): String {
val baseDir = File(temporaryFolder.root, nextNodeIndex++.toString())
// Problems originally exposed by driver startNodesInProcess, so do what driver does:
return "jdbc:h2:file:$baseDir/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100;AUTO_SERVER_PORT=0"
}
@Test
fun `logVendorString does not leak connection`() {
// Note this test also covers a transaction that CordaPersistence does while it's instantiating:
val database = configureDatabase(hikariProperties(freshURL()), DatabaseConfig(), rigorousMock())
val log = mock<Logger>() // Don't care what happens here.
// Actually 10 is enough to reproduce old code hang, as pool size is 10 and we leaked 9 connections and 1 is in flight:
repeat(100) {
logVendorString(database, log)
}
}
@Test
fun `H2 fix is applied`() {
repeat(if (relaxedThoroughness) 1 else 100) {
// Two "nodes" seems to be the magic number to reproduce the problem:
val urls = (0 until 2).map { freshURL() }
// Haven't been able to reproduce in a warm JVM:
assertEquals(0, startJavaProcess<ColdJVM>(urls).waitFor())
}
}
}
private fun hikariProperties(url: String) = Properties().apply {
put("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
put("dataSource.url", url)
put("dataSource.user", "sa")
put("dataSource.password", "")
}
class ColdJVM {
companion object {
private val log = contextLogger()
private val pool = Executors.newCachedThreadPool()
@JvmStatic
fun main(urls: Array<String>) {
val f = urls.map {
pool.fork { startNode(it) } // Like driver starting in-process nodes in parallel.
}.transpose()
try {
f.getOrThrow()
System.exit(0) // Kill non-daemon threads.
} catch (t: Throwable) {
log.error("H2 fix did not work:", t)
System.exit(1)
}
}
private fun startNode(url: String) {
// Go via the API to trigger the fix, but don't allow a HikariPool to interfere:
val dataSource = DataSourceFactory.createDataSource(hikariProperties(url), false)
assertEquals("org.h2.jdbcx.JdbcDataSource", dataSource.javaClass.name)
// Like HikariPool.checkFailFast, immediately after which the Database is removed from Engine.DATABASES:
dataSource.connection.use { log.info("Foreground connection: {}", it) }
pool.fork {
// Like logVendorString, which is done via the connection pool:
dataSource.connection.use { log.info("Background connection: {}", it) }
}.getOrThrow()
}
}
}

View File

@ -0,0 +1,3 @@
package net.corda.testing.common.internal
val relaxedThoroughness = System.getenv("TEAMCITY_PROJECT_NAME") == "Pull Requests"