From ea57639a378fa45f0aef6930bc4450d0bbfaa5f4 Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Wed, 17 Jan 2018 14:23:13 +0000 Subject: [PATCH] CORDA-924 Fix IntegrationTestingTutorial (#2349) i.e. H2 startup bug. also: * Fix poor connection pool discipline * Log cleanup failures instead of replacing foreground failure --- .../internal/persistence/CordaPersistence.kt | 36 +++++--- .../kotlin/net/corda/node/AuthDBTests.kt | 14 ++- .../net/corda/node/internal/AbstractNode.kt | 18 ++-- .../corda/node/internal/DataSourceFactory.kt | 46 ++++++++++ .../security/RPCSecurityManagerImpl.kt | 5 +- .../corda/node/internal/AbstractNodeTests.kt | 91 +++++++++++++++++++ .../testing/common/internal/Thoroughness.kt | 3 + 7 files changed, 183 insertions(+), 30 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/internal/DataSourceFactory.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/AbstractNodeTests.kt create mode 100644 testing/test-common/src/main/kotlin/net/corda/testing/common/internal/Thoroughness.kt diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index 146bd4308d..28674fd1e7 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -1,6 +1,7 @@ package net.corda.nodeapi.internal.persistence import net.corda.core.schemas.MappedSchema +import net.corda.core.utilities.contextLogger import rx.Observable import rx.Subscriber import rx.subjects.UnicastSubject @@ -44,6 +45,10 @@ class CordaPersistence( schemas: Set, attributeConverters: Collection> = emptySet() ) : Closeable { + companion object { + private val log = contextLogger() + } + val defaultIsolationLevel = databaseConfig.transactionIsolationLevel val hibernateConfig: HibernateConfiguration by lazy { transaction { @@ -56,9 +61,7 @@ class CordaPersistence( DatabaseTransactionManager(this) // Check not in read-only mode. transaction { - dataSource.connection.use { - check(!it.metaData.isReadOnly) { "Database should not be readonly." } - } + check(!connection.metaData.isReadOnly) { "Database should not be readonly." } } } @@ -90,7 +93,7 @@ class CordaPersistence( */ fun transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T { DatabaseTransactionManager.dataSource = this - return transaction(isolationLevel, 3, statement) + return transaction(isolationLevel, 2, statement) } /** @@ -99,17 +102,22 @@ class CordaPersistence( */ fun transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement) - private fun transaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T { + private fun transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T { val outer = DatabaseTransactionManager.currentOrNull() return if (outer != null) { outer.statement() } else { - inTopLevelTransaction(isolationLevel, repetitionAttempts, statement) + inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, statement) } } - private fun inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T { - var repetitions = 0 + private fun inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T { + var recoverableFailureCount = 0 + fun quietly(task: () -> T) = try { + task() + } catch (t: Throwable) { + log.warn("Cleanup task failed:", t) + } while (true) { val transaction = DatabaseTransactionManager.currentOrNew(isolationLevel) try { @@ -117,16 +125,14 @@ class CordaPersistence( transaction.commit() return answer } catch (e: SQLException) { - transaction.rollback() - repetitions++ - if (repetitions >= repetitionAttempts) { - throw e - } + quietly(transaction::rollback) + if (++recoverableFailureCount > recoverableFailureTolerance) throw e + log.warn("Caught failure, will retry:", e) } catch (e: Throwable) { - transaction.rollback() + quietly(transaction::rollback) throw e } finally { - transaction.close() + quietly(transaction::close) } } } diff --git a/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt b/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt index 50689e84f5..1c61915e40 100644 --- a/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt @@ -9,6 +9,7 @@ import net.corda.core.flows.StartableByRPC import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.finance.flows.CashIssueFlow +import net.corda.node.internal.DataSourceFactory import net.corda.node.internal.Node import net.corda.node.internal.StartedNode import net.corda.node.services.Permissions @@ -22,8 +23,9 @@ import org.junit.Before import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import java.sql.DriverManager import java.sql.Statement +import java.util.* +import javax.sql.DataSource import kotlin.test.assertFailsWith /* @@ -268,8 +270,9 @@ private class UsersDB : AutoCloseable { } } + private val dataSource: DataSource inline private fun session(statement: (Statement) -> Unit) { - DriverManager.getConnection(jdbcUrl).use { + dataSource.connection.use { it.autoCommit = false it.createStatement().use(statement) it.commit() @@ -281,7 +284,10 @@ private class UsersDB : AutoCloseable { roleAndPermissions: List = emptyList()) { jdbcUrl = "jdbc:h2:mem:${name};DB_CLOSE_DELAY=-1" - + dataSource = DataSourceFactory.createDataSource(Properties().apply { + put("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource") + put("dataSource.url", jdbcUrl) + }, false) session { it.execute(DB_CREATE_SCHEMA) } @@ -295,7 +301,7 @@ private class UsersDB : AutoCloseable { } override fun close() { - DriverManager.getConnection(jdbcUrl).use { + dataSource.connection.use { it.createStatement().use { it.execute("DROP ALL OBJECTS") } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index e9ffeb0dcf..94f864a55e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -3,8 +3,6 @@ package net.corda.node.internal import com.codahale.metrics.MetricRegistry import com.google.common.collect.MutableClassToInstanceMap import com.google.common.util.concurrent.MoreExecutors -import com.zaxxer.hikari.HikariConfig -import com.zaxxer.hikari.HikariDataSource import net.corda.confidential.SwapIdentitiesFlow import net.corda.confidential.SwapIdentitiesHandler import net.corda.core.CordaException @@ -612,9 +610,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, if (props.isNotEmpty()) { val database = configureDatabase(props, configuration.database, identityService, schemaService) // Now log the vendor string as this will also cause a connection to be tested eagerly. - database.transaction { - log.info("Connected to ${database.dataSource.connection.metaData.databaseProductName} database.") - } + logVendorString(database, log) runOnStop += database::close return database.transaction { insideTransaction(database) @@ -818,6 +814,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } +@VisibleForTesting +internal fun logVendorString(database: CordaPersistence, log: Logger) { + database.transaction { + log.info("Connected to ${connection.metaData.databaseProductName} database.") + } +} + internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter { override fun startFlow(logic: FlowLogic, context: InvocationContext): CordaFuture> { return serverThread.fetchFrom { smm.startFlow(logic, context) } @@ -840,7 +843,7 @@ class ConfigurationException(message: String) : CordaException(message) */ internal class NetworkMapCacheEmptyException : Exception() -fun configureDatabase(dataSourceProperties: Properties, +fun configureDatabase(hikariProperties: Properties, databaseConfig: DatabaseConfig, identityService: IdentityService, schemaService: SchemaService = NodeSchemaService()): CordaPersistence { @@ -849,8 +852,7 @@ fun configureDatabase(dataSourceProperties: Properties, // so we end up providing both descriptor and converter. We should re-examine this in later versions to see if // either Hibernate can be convinced to stop warning, use the descriptor by default, or something else. JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService)) - val config = HikariConfig(dataSourceProperties) - val dataSource = HikariDataSource(config) + val dataSource = DataSourceFactory.createDataSource(hikariProperties) val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService)) return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters) } diff --git a/node/src/main/kotlin/net/corda/node/internal/DataSourceFactory.kt b/node/src/main/kotlin/net/corda/node/internal/DataSourceFactory.kt new file mode 100644 index 0000000000..d4cad1ff10 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/DataSourceFactory.kt @@ -0,0 +1,46 @@ +package net.corda.node.internal + +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import com.zaxxer.hikari.util.PropertyElf +import net.corda.core.internal.declaredField +import org.h2.engine.Database +import org.h2.engine.Engine +import org.slf4j.LoggerFactory +import java.lang.reflect.Modifier +import java.util.* +import javax.sql.DataSource + +object DataSourceFactory { + /** H2 only uses get/put/remove on [Engine.DATABASES], and it has to be a [HashMap]. */ + private class SynchronizedGetPutRemove : HashMap() { + @Synchronized + override fun get(key: K) = super.get(key) + + @Synchronized + override fun put(key: K, value: V) = super.put(key, value) + + @Synchronized + override fun remove(key: K) = super.remove(key) + } + + init { + LoggerFactory.getLogger(javaClass).debug("Applying H2 fix.") // See CORDA-924. + Engine::class.java.getDeclaredField("DATABASES").apply { + isAccessible = true + declaredField("modifiers").apply { value = value and Modifier.FINAL.inv() } + }.set(null, SynchronizedGetPutRemove()) + } + + fun createDataSource(hikariProperties: Properties, pool: Boolean = true): DataSource { + val config = HikariConfig(hikariProperties) + return if (pool) { + HikariDataSource(config) + } else { + // Basic init for the one test that wants to go via this API but without starting a HikariPool: + (Class.forName(hikariProperties.getProperty("dataSourceClassName")).newInstance() as DataSource).also { + PropertyElf.setTargetFromProperties(it, config.dataSourceProperties) + } + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt index 3b1f28dc38..8be003ab0d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt @@ -3,10 +3,9 @@ package net.corda.node.internal.security import com.google.common.cache.CacheBuilder import com.google.common.cache.Cache import com.google.common.primitives.Ints -import com.zaxxer.hikari.HikariConfig -import com.zaxxer.hikari.HikariDataSource import net.corda.core.context.AuthServiceId import net.corda.core.utilities.loggerFor +import net.corda.node.internal.DataSourceFactory import net.corda.node.services.config.PasswordEncryption import net.corda.node.services.config.SecurityConfiguration import net.corda.node.services.config.AuthDataSourceType @@ -246,7 +245,7 @@ private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource init { credentialsMatcher = buildCredentialMatcher(config.passwordEncryption) setPermissionsLookupEnabled(true) - dataSource = HikariDataSource(HikariConfig(config.connection!!)) + dataSource = DataSourceFactory.createDataSource(config.connection!!) permissionResolver = RPCPermissionResolver } diff --git a/node/src/test/kotlin/net/corda/node/internal/AbstractNodeTests.kt b/node/src/test/kotlin/net/corda/node/internal/AbstractNodeTests.kt new file mode 100644 index 0000000000..5dc2f28844 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/AbstractNodeTests.kt @@ -0,0 +1,91 @@ +package net.corda.node.internal + +import com.nhaarman.mockito_kotlin.mock +import net.corda.core.internal.concurrent.fork +import net.corda.core.internal.concurrent.transpose +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow +import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.testing.common.internal.relaxedThoroughness +import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.internal.ProcessUtilities.startJavaProcess +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import org.slf4j.Logger +import java.io.File +import java.util.* +import java.util.concurrent.Executors +import kotlin.test.assertEquals + +class AbstractNodeTests { + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + private var nextNodeIndex = 0 + private fun freshURL(): String { + val baseDir = File(temporaryFolder.root, nextNodeIndex++.toString()) + // Problems originally exposed by driver startNodesInProcess, so do what driver does: + return "jdbc:h2:file:$baseDir/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100;AUTO_SERVER_PORT=0" + } + + @Test + fun `logVendorString does not leak connection`() { + // Note this test also covers a transaction that CordaPersistence does while it's instantiating: + val database = configureDatabase(hikariProperties(freshURL()), DatabaseConfig(), rigorousMock()) + val log = mock() // Don't care what happens here. + // Actually 10 is enough to reproduce old code hang, as pool size is 10 and we leaked 9 connections and 1 is in flight: + repeat(100) { + logVendorString(database, log) + } + } + + @Test + fun `H2 fix is applied`() { + repeat(if (relaxedThoroughness) 1 else 100) { + // Two "nodes" seems to be the magic number to reproduce the problem: + val urls = (0 until 2).map { freshURL() } + // Haven't been able to reproduce in a warm JVM: + assertEquals(0, startJavaProcess(urls).waitFor()) + } + } +} + +private fun hikariProperties(url: String) = Properties().apply { + put("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource") + put("dataSource.url", url) + put("dataSource.user", "sa") + put("dataSource.password", "") +} + +class ColdJVM { + companion object { + private val log = contextLogger() + private val pool = Executors.newCachedThreadPool() + @JvmStatic + fun main(urls: Array) { + val f = urls.map { + pool.fork { startNode(it) } // Like driver starting in-process nodes in parallel. + }.transpose() + try { + f.getOrThrow() + System.exit(0) // Kill non-daemon threads. + } catch (t: Throwable) { + log.error("H2 fix did not work:", t) + System.exit(1) + } + } + + private fun startNode(url: String) { + // Go via the API to trigger the fix, but don't allow a HikariPool to interfere: + val dataSource = DataSourceFactory.createDataSource(hikariProperties(url), false) + assertEquals("org.h2.jdbcx.JdbcDataSource", dataSource.javaClass.name) + // Like HikariPool.checkFailFast, immediately after which the Database is removed from Engine.DATABASES: + dataSource.connection.use { log.info("Foreground connection: {}", it) } + pool.fork { + // Like logVendorString, which is done via the connection pool: + dataSource.connection.use { log.info("Background connection: {}", it) } + }.getOrThrow() + } + } +} diff --git a/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/Thoroughness.kt b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/Thoroughness.kt new file mode 100644 index 0000000000..298eaa5660 --- /dev/null +++ b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/Thoroughness.kt @@ -0,0 +1,3 @@ +package net.corda.testing.common.internal + +val relaxedThoroughness = System.getenv("TEAMCITY_PROJECT_NAME") == "Pull Requests"