Merge commit 'ea57639a378fa45f0aef6930bc4450d0bbfaa5f4' into andr3ej-entbot

# Conflicts:
#	node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
This commit is contained in:
Andrzej Cichocki 2018-01-18 11:15:14 +00:00
commit 42e1a38c6b
No known key found for this signature in database
GPG Key ID: 21B3BCB0BD5B0832
10 changed files with 282 additions and 36 deletions

View File

@ -361,3 +361,30 @@ artifactory {
task generateApi(type: net.corda.plugins.GenerateApi){
baseName = "api-corda"
}
// This exists to reduce CI build time when the envvar is set (can save up to 40 minutes)
if(System.getenv('CORDA_DOCS_ONLY_BUILD') != null) {
logger.info("Tests are disabled due to presence of envvar CORDA_DOCS_ONLY_BUILD")
allprojects {
test {
exclude '*/**'
}
it.afterEvaluate {
if(it.tasks.findByName("integrationTest") != null) {
integrationTest {
exclude '*/**'
}
}
}
it.afterEvaluate {
if(it.tasks.findByName("smokeTest") != null) {
smokeTest {
exclude '*/**'
}
}
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.nodeapi.internal.persistence
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
import rx.Observable
import rx.Subscriber
import rx.subjects.UnicastSubject
@ -46,6 +47,10 @@ class CordaPersistence(
val jdbcUrl: String,
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
) : Closeable {
companion object {
private val log = contextLogger()
}
val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
val hibernateConfig: HibernateConfiguration by lazy {
@ -59,9 +64,7 @@ class CordaPersistence(
DatabaseTransactionManager(this)
// Check not in read-only mode.
transaction {
dataSource.connection.use {
check(!it.metaData.isReadOnly) { "Database should not be readonly." }
}
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
}
}
@ -97,7 +100,7 @@ class CordaPersistence(
*/
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T {
DatabaseTransactionManager.dataSource = this
return transaction(isolationLevel, 3, statement)
return transaction(isolationLevel, 2, statement)
}
/**
@ -106,17 +109,22 @@ class CordaPersistence(
*/
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
private fun <T> transaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
private fun <T> transaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
val outer = DatabaseTransactionManager.currentOrNull()
return if (outer != null) {
outer.statement()
} else {
inTopLevelTransaction(isolationLevel, repetitionAttempts, statement)
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, statement)
}
}
private fun <T> inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
var repetitions = 0
private fun <T> inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, recoverableFailureTolerance: Int, statement: DatabaseTransaction.() -> T): T {
var recoverableFailureCount = 0
fun <T> quietly(task: () -> T) = try {
task()
} catch (t: Throwable) {
log.warn("Cleanup task failed:", t)
}
while (true) {
val transaction = DatabaseTransactionManager.currentOrNew(isolationLevel)
try {
@ -124,16 +132,14 @@ class CordaPersistence(
transaction.commit()
return answer
} catch (e: SQLException) {
transaction.rollback()
repetitions++
if (repetitions >= repetitionAttempts) {
throw e
}
quietly(transaction::rollback)
if (++recoverableFailureCount > recoverableFailureTolerance) throw e
log.warn("Caught failure, will retry:", e)
} catch (e: Throwable) {
transaction.rollback()
quietly(transaction::rollback)
throw e
} finally {
transaction.close()
quietly(transaction::close)
}
}
}

View File

@ -9,6 +9,7 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.internal.DataSourceFactory
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions
@ -25,8 +26,9 @@ import org.junit.ClassRule
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.sql.DriverManager
import java.sql.Statement
import java.util.*
import javax.sql.DataSource
import kotlin.test.assertFailsWith
/*
@ -273,8 +275,9 @@ private class UsersDB : AutoCloseable {
}
}
private val dataSource: DataSource
inline private fun session(statement: (Statement) -> Unit) {
DriverManager.getConnection(jdbcUrl).use {
dataSource.connection.use {
it.autoCommit = false
it.createStatement().use(statement)
it.commit()
@ -286,7 +289,10 @@ private class UsersDB : AutoCloseable {
roleAndPermissions: List<RoleAndPermissions> = emptyList()) {
jdbcUrl = "jdbc:h2:mem:${name};DB_CLOSE_DELAY=-1"
dataSource = DataSourceFactory.createDataSource(Properties().apply {
put("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
put("dataSource.url", jdbcUrl)
}, false)
session {
it.execute(DB_CREATE_SCHEMA)
}
@ -300,7 +306,7 @@ private class UsersDB : AutoCloseable {
}
override fun close() {
DriverManager.getConnection(jdbcUrl).use {
dataSource.connection.use {
it.createStatement().use {
it.execute("DROP ALL OBJECTS")
}

View File

@ -12,7 +12,6 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.sign
import net.corda.core.flows.*
@ -635,9 +634,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
if (props.isNotEmpty()) {
val database = configureDatabase(props, configuration.database, identityService, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly.
database.transaction {
log.info("Connected to ${connection.metaData.databaseProductName} database.")
}
logVendorString(database, log)
runOnStop += database::close
return database.transaction {
insideTransaction(database)
@ -846,6 +843,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
@VisibleForTesting
internal fun logVendorString(database: CordaPersistence, log: Logger) {
database.transaction {
log.info("Connected to ${connection.metaData.databaseProductName} database.")
}
}
internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter {
override fun <T> startFlow(logic: FlowLogic<T>, context: InvocationContext): CordaFuture<FlowStateMachine<T>> {
return serverThread.fetchFrom { smm.startFlow(logic, context) }
@ -868,7 +872,7 @@ class ConfigurationException(message: String) : CordaException(message)
*/
internal class NetworkMapCacheEmptyException : Exception()
fun configureDatabase(dataSourceProperties: Properties,
fun configureDatabase(hikariProperties: Properties,
databaseConfig: DatabaseConfig,
identityService: IdentityService,
schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
@ -877,12 +881,10 @@ fun configureDatabase(dataSourceProperties: Properties,
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService))
val config = HikariConfig(dataSourceProperties)
val dataSource = HikariDataSource(config)
val dataSource = DataSourceFactory.createDataSource(hikariProperties)
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService))
val jdbcUrl = config.dataSourceProperties.getProperty("url", "")
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
if (databaseConfig.runMigration) {
SchemaMigration(schemaService.schemaOptions.keys, dataSource, !isH2Database(jdbcUrl), databaseConfig).runMigration()

View File

@ -0,0 +1,46 @@
package net.corda.node.internal
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import com.zaxxer.hikari.util.PropertyElf
import net.corda.core.internal.declaredField
import org.h2.engine.Database
import org.h2.engine.Engine
import org.slf4j.LoggerFactory
import java.lang.reflect.Modifier
import java.util.*
import javax.sql.DataSource
object DataSourceFactory {
/** H2 only uses get/put/remove on [Engine.DATABASES], and it has to be a [HashMap]. */
private class SynchronizedGetPutRemove<K, V> : HashMap<K, V>() {
@Synchronized
override fun get(key: K) = super.get(key)
@Synchronized
override fun put(key: K, value: V) = super.put(key, value)
@Synchronized
override fun remove(key: K) = super.remove(key)
}
init {
LoggerFactory.getLogger(javaClass).debug("Applying H2 fix.") // See CORDA-924.
Engine::class.java.getDeclaredField("DATABASES").apply {
isAccessible = true
declaredField<Int>("modifiers").apply { value = value and Modifier.FINAL.inv() }
}.set(null, SynchronizedGetPutRemove<String, Database>())
}
fun createDataSource(hikariProperties: Properties, pool: Boolean = true): DataSource {
val config = HikariConfig(hikariProperties)
return if (pool) {
HikariDataSource(config)
} else {
// Basic init for the one test that wants to go via this API but without starting a HikariPool:
(Class.forName(hikariProperties.getProperty("dataSourceClassName")).newInstance() as DataSource).also {
PropertyElf.setTargetFromProperties(it, config.dataSourceProperties)
}
}
}
}

View File

@ -3,10 +3,9 @@ package net.corda.node.internal.security
import com.google.common.cache.CacheBuilder
import com.google.common.cache.Cache
import com.google.common.primitives.Ints
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.core.context.AuthServiceId
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.DataSourceFactory
import net.corda.node.services.config.PasswordEncryption
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.AuthDataSourceType
@ -246,7 +245,7 @@ private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource
init {
credentialsMatcher = buildCredentialMatcher(config.passwordEncryption)
setPermissionsLookupEnabled(true)
dataSource = HikariDataSource(HikariConfig(config.connection!!))
dataSource = DataSourceFactory.createDataSource(config.connection!!)
permissionResolver = RPCPermissionResolver
}

View File

@ -0,0 +1,91 @@
package net.corda.node.internal
import com.nhaarman.mockito_kotlin.mock
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.common.internal.relaxedThoroughness
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.internal.ProcessUtilities.startJavaProcess
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import org.slf4j.Logger
import java.io.File
import java.util.*
import java.util.concurrent.Executors
import kotlin.test.assertEquals
class AbstractNodeTests {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private var nextNodeIndex = 0
private fun freshURL(): String {
val baseDir = File(temporaryFolder.root, nextNodeIndex++.toString())
// Problems originally exposed by driver startNodesInProcess, so do what driver does:
return "jdbc:h2:file:$baseDir/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100;AUTO_SERVER_PORT=0"
}
@Test
fun `logVendorString does not leak connection`() {
// Note this test also covers a transaction that CordaPersistence does while it's instantiating:
val database = configureDatabase(hikariProperties(freshURL()), DatabaseConfig(), rigorousMock())
val log = mock<Logger>() // Don't care what happens here.
// Actually 10 is enough to reproduce old code hang, as pool size is 10 and we leaked 9 connections and 1 is in flight:
repeat(100) {
logVendorString(database, log)
}
}
@Test
fun `H2 fix is applied`() {
repeat(if (relaxedThoroughness) 1 else 100) {
// Two "nodes" seems to be the magic number to reproduce the problem:
val urls = (0 until 2).map { freshURL() }
// Haven't been able to reproduce in a warm JVM:
assertEquals(0, startJavaProcess<ColdJVM>(urls).waitFor())
}
}
}
private fun hikariProperties(url: String) = Properties().apply {
put("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
put("dataSource.url", url)
put("dataSource.user", "sa")
put("dataSource.password", "")
}
class ColdJVM {
companion object {
private val log = contextLogger()
private val pool = Executors.newCachedThreadPool()
@JvmStatic
fun main(urls: Array<String>) {
val f = urls.map {
pool.fork { startNode(it) } // Like driver starting in-process nodes in parallel.
}.transpose()
try {
f.getOrThrow()
System.exit(0) // Kill non-daemon threads.
} catch (t: Throwable) {
log.error("H2 fix did not work:", t)
System.exit(1)
}
}
private fun startNode(url: String) {
// Go via the API to trigger the fix, but don't allow a HikariPool to interfere:
val dataSource = DataSourceFactory.createDataSource(hikariProperties(url), false)
assertEquals("org.h2.jdbcx.JdbcDataSource", dataSource.javaClass.name)
// Like HikariPool.checkFailFast, immediately after which the Database is removed from Engine.DATABASES:
dataSource.connection.use { log.info("Foreground connection: {}", it) }
pool.fork {
// Like logVendorString, which is done via the connection pool:
dataSource.connection.use { log.info("Background connection: {}", it) }
}.getOrThrow()
}
}
}

View File

@ -0,0 +1,3 @@
package net.corda.testing.common.internal
val relaxedThoroughness = System.getenv("TEAMCITY_PROJECT_NAME") == "Pull Requests"

View File

@ -1,6 +1,9 @@
package net.corda.webserver.api
import net.corda.core.node.NodeInfo
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.identity.Party
import net.corda.core.utilities.NetworkHostAndPort
import java.time.LocalDateTime
import javax.ws.rs.GET
import javax.ws.rs.Path
@ -34,10 +37,59 @@ interface APIServer {
fun status(): Response
/**
* Report this node's configuration and identities.
* Report this node's addresses.
*/
@GET
@Path("info")
@Path("addresses")
@Produces(MediaType.APPLICATION_JSON)
fun info(): NodeInfo
fun addresses(): List<NetworkHostAndPort>
/**
* Report this node's legal identities.
*/
@GET
@Path("identities")
@Produces(MediaType.APPLICATION_JSON)
fun identities(): List<Party>
/**
* Report this node's platform version.
*/
@GET
@Path("platformversion")
@Produces(MediaType.APPLICATION_JSON)
fun platformVersion(): Int
/**
* Report the peers on the network.
*/
@GET
@Path("peers")
@Produces(MediaType.APPLICATION_JSON)
fun peers(): List<Party>
/**
* Report the notaries on the network.
*/
@GET
@Path("notaries")
@Produces(MediaType.APPLICATION_JSON)
fun notaries(): List<Party>
/**
* Report this node's registered flows.
*/
@GET
@Path("flows")
@Produces(MediaType.APPLICATION_JSON)
fun flows(): List<String>
/**
* Report this node's vault states.
*/
@GET
@Path("states")
@Produces(MediaType.APPLICATION_JSON)
fun states(): List<StateAndRef<ContractState>>
}

View File

@ -1,6 +1,8 @@
package net.corda.webserver.internal
import net.corda.core.contracts.ContractState
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.vaultQueryBy
import net.corda.webserver.api.APIServer
import java.time.LocalDateTime
import java.time.ZoneId
@ -18,5 +20,17 @@ class APIServerImpl(val rpcOps: CordaRPCOps) : APIServer {
return Response.ok("started").build()
}
override fun info() = rpcOps.nodeInfo()
override fun addresses() = rpcOps.nodeInfo().addresses
override fun identities() = rpcOps.nodeInfo().legalIdentities
override fun platformVersion() = rpcOps.nodeInfo().platformVersion
override fun peers() = rpcOps.networkMapSnapshot().flatMap { it.legalIdentities }
override fun notaries() = rpcOps.notaryIdentities()
override fun flows() = rpcOps.registeredFlows()
override fun states() = rpcOps.vaultQueryBy<ContractState>().states
}