mirror of
https://github.com/corda/corda.git
synced 2025-01-26 14:19:23 +00:00
decouple Exposed (#1028)
Exposed library decoupled from transaction management and JDBC connection creation for Hibernate and ReQuery
This commit is contained in:
parent
1996c39b9a
commit
05327f3826
@ -16,7 +16,6 @@ import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.flows.CashIssueFlow
|
||||
import net.corda.node.internal.CordaRPCOpsImpl
|
||||
import net.corda.node.services.startFlowPermission
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.testing.RPCDriverExposedDSLInterface
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
|
@ -7,7 +7,6 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.utilities.opaque
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.testing.DUMMY_NOTARY_KEY
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.MEGA_CORP_KEY
|
||||
import net.corda.testing.MINI_CORP
|
||||
|
@ -18,7 +18,6 @@ import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.statemachine.SessionInit
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
|
@ -10,7 +10,6 @@ import net.corda.testing.DUMMY_NOTARY_KEY
|
||||
import net.corda.flows.CashIssueFlow
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
|
@ -14,7 +14,6 @@ import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.DUMMY_NOTARY_KEY
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
|
@ -11,7 +11,6 @@ import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
@ -212,8 +211,7 @@ class CommercialPaperTestsGeneric {
|
||||
fun `issue move and then redeem`() {
|
||||
|
||||
val dataSourcePropsAlice = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabaseAlice = configureDatabase(dataSourcePropsAlice)
|
||||
val databaseAlice = dataSourceAndDatabaseAlice.second
|
||||
val databaseAlice = configureDatabase(dataSourcePropsAlice)
|
||||
databaseAlice.transaction {
|
||||
|
||||
aliceServices = object : MockServices(ALICE_KEY) {
|
||||
@ -232,8 +230,7 @@ class CommercialPaperTestsGeneric {
|
||||
}
|
||||
|
||||
val dataSourcePropsBigCorp = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabaseBigCorp = configureDatabase(dataSourcePropsBigCorp)
|
||||
val databaseBigCorp = dataSourceAndDatabaseBigCorp.second
|
||||
val databaseBigCorp = configureDatabase(dataSourcePropsBigCorp)
|
||||
databaseBigCorp.transaction {
|
||||
|
||||
bigCorpServices = object : MockServices(BIG_CORP_KEY) {
|
||||
|
@ -14,16 +14,14 @@ import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.MockKeyManagementService
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.security.KeyPair
|
||||
import java.util.*
|
||||
import kotlin.test.*
|
||||
@ -45,17 +43,14 @@ class CashTests {
|
||||
|
||||
lateinit var miniCorpServices: MockServices
|
||||
val vault: VaultService get() = miniCorpServices.vaultService
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var vaultStatesUnconsumed: List<StateAndRef<Cash.State>>
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(NodeVaultService::class)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(dataSourceProps)
|
||||
database.transaction {
|
||||
miniCorpServices = object : MockServices(MINI_CORP_KEY) {
|
||||
override val keyManagementService: MockKeyManagementService = MockKeyManagementService(identityService, MINI_CORP_KEY, MEGA_CORP_KEY, OUR_KEY)
|
||||
|
@ -9,7 +9,6 @@ import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.trackBy
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.expect
|
||||
import net.corda.testing.expectEvents
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
|
||||
|
@ -14,7 +14,6 @@ import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.flows.IssuerFlow.IssuanceRequester
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.contracts.calculateRandomlySizedAmounts
|
||||
import net.corda.testing.node.MockNetwork
|
||||
|
@ -21,7 +21,6 @@ import net.corda.node.services.transactions.BFTNonValidatingNotaryService
|
||||
import net.corda.node.services.transactions.minClusterSize
|
||||
import net.corda.node.services.transactions.minCorrectReplicas
|
||||
import net.corda.node.utilities.ServiceIdentityGenerator
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
|
@ -11,7 +11,6 @@ import net.corda.core.getOrThrow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.map
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.DUMMY_BANK_A
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
|
@ -12,13 +12,11 @@ import com.google.common.collect.testing.testers.*
|
||||
import junit.framework.TestSuite
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.junit.*
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Suite
|
||||
import java.io.Closeable
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
|
||||
@ -32,9 +30,8 @@ import java.util.*
|
||||
JDBCHashMapTestSuite.SetConstrained::class)
|
||||
class JDBCHashMapTestSuite {
|
||||
companion object {
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var transaction: Transaction
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var loadOnInitFalseMap: JDBCHashMap<String, String>
|
||||
lateinit var memoryConstrainedMap: JDBCHashMap<String, String>
|
||||
lateinit var loadOnInitTrueMap: JDBCHashMap<String, String>
|
||||
@ -45,9 +42,7 @@ class JDBCHashMapTestSuite {
|
||||
@JvmStatic
|
||||
@BeforeClass
|
||||
fun before() {
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
setUpDatabaseTx()
|
||||
loadOnInitFalseMap = JDBCHashMap<String, String>("test_map_false", loadOnInit = false)
|
||||
memoryConstrainedMap = JDBCHashMap<String, String>("test_map_constrained", loadOnInit = false, maxBuckets = 1)
|
||||
@ -61,7 +56,7 @@ class JDBCHashMapTestSuite {
|
||||
@AfterClass
|
||||
fun after() {
|
||||
closeDatabaseTx()
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
@ -228,19 +223,16 @@ class JDBCHashMapTestSuite {
|
||||
|
||||
private val transientMapForComparison = applyOpsToMap(LinkedHashMap())
|
||||
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
}
|
||||
|
||||
@After
|
||||
fun after() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
|
||||
|
@ -63,7 +63,6 @@ import net.corda.node.utilities.*
|
||||
import net.corda.node.utilities.AddOrRemove.ADD
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.slf4j.Logger
|
||||
import rx.Observable
|
||||
import java.io.IOException
|
||||
@ -131,7 +130,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
var inNodeNetworkMapService: NetworkMapService? = null
|
||||
lateinit var network: MessagingService
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
protected var dbCloser: (() -> Any?)? = null
|
||||
|
||||
var isPreviousCheckpointsPresent = false
|
||||
@ -547,11 +546,12 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
protected open fun initialiseDatabasePersistence(insideTransaction: () -> Unit) {
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isNotEmpty()) {
|
||||
val (toClose, database) = configureDatabase(props)
|
||||
this.database = database
|
||||
this.database = configureDatabase(props)
|
||||
// Now log the vendor string as this will also cause a connection to be tested eagerly.
|
||||
log.info("Connected to ${database.vendor} database.")
|
||||
toClose::close.let {
|
||||
database.transaction {
|
||||
log.info("Connected to ${database.database.vendor} database.")
|
||||
}
|
||||
this.database::close.let {
|
||||
dbCloser = it
|
||||
runOnStop += it
|
||||
}
|
||||
@ -811,7 +811,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
override val clock: Clock get() = platformClock
|
||||
override val myInfo: NodeInfo get() = info
|
||||
override val schemaService by lazy { NodeSchemaService(pluginRegistries.flatMap { it.requiredSchemas }.toSet()) }
|
||||
override val database: Database get() = this@AbstractNode.database
|
||||
override val database: CordaPersistence get() = this@AbstractNode.database
|
||||
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
|
||||
|
||||
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
|
||||
|
@ -24,9 +24,8 @@ import net.corda.node.services.messaging.requirePermission
|
||||
import net.corda.node.services.startFlowPermission
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import rx.Observable
|
||||
import java.io.InputStream
|
||||
import java.security.PublicKey
|
||||
@ -40,7 +39,7 @@ import java.util.*
|
||||
class CordaRPCOpsImpl(
|
||||
private val services: ServiceHubInternal,
|
||||
private val smm: StateMachineManager,
|
||||
private val database: Database
|
||||
private val database: CordaPersistence
|
||||
) : CordaRPCOps {
|
||||
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
|
||||
return database.transaction {
|
||||
|
@ -23,7 +23,7 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
|
||||
interface NetworkMapCacheInternal : NetworkMapCache {
|
||||
/**
|
||||
@ -81,7 +81,7 @@ interface ServiceHubInternal : PluginServiceHub {
|
||||
val auditService: AuditService
|
||||
val rpcFlows: List<Class<out FlowLogic<*>>>
|
||||
val networkService: MessagingService
|
||||
val database: Database
|
||||
val database: CordaPersistence
|
||||
val configuration: NodeConfiguration
|
||||
|
||||
@Suppress("DEPRECATION")
|
||||
|
@ -1,9 +1,9 @@
|
||||
package net.corda.node.services.database
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import org.hibernate.SessionFactory
|
||||
import org.hibernate.boot.MetadataSources
|
||||
import org.hibernate.boot.model.naming.Identifier
|
||||
@ -13,7 +13,6 @@ import org.hibernate.cfg.Configuration
|
||||
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
|
||||
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment
|
||||
import org.hibernate.service.UnknownUnwrapTypeException
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
@ -94,17 +93,15 @@ class HibernateConfiguration(val schemaService: SchemaService, val useDefaultLog
|
||||
// during schema creation / update.
|
||||
class NodeDatabaseConnectionProvider : ConnectionProvider {
|
||||
override fun closeConnection(conn: Connection) {
|
||||
val tx = TransactionManager.current()
|
||||
val tx = DatabaseTransactionManager.current()
|
||||
tx.commit()
|
||||
tx.close()
|
||||
}
|
||||
|
||||
override fun supportsAggressiveRelease(): Boolean = true
|
||||
|
||||
override fun getConnection(): Connection {
|
||||
val tx = TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
return tx.connection
|
||||
}
|
||||
override fun getConnection(): Connection =
|
||||
DatabaseTransactionManager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection
|
||||
|
||||
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
|
||||
try {
|
||||
|
@ -13,7 +13,7 @@ import net.corda.core.schemas.requery.converters.InstantConverter
|
||||
import net.corda.core.schemas.requery.converters.SecureHashConverter
|
||||
import net.corda.core.schemas.requery.converters.StateRefConverter
|
||||
import net.corda.core.schemas.requery.converters.VaultStateStatusConverter
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
@ -128,12 +128,7 @@ class KotlinConfigurationTransactionWrapper(private val model: EntityModel,
|
||||
}
|
||||
|
||||
class CordaDataSourceConnectionProvider(val dataSource: DataSource) : ConnectionProvider {
|
||||
override fun getConnection(): Connection {
|
||||
val tx = TransactionManager.manager.currentOrNull()
|
||||
return CordaConnection(
|
||||
tx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.")
|
||||
)
|
||||
}
|
||||
override fun getConnection(): Connection = CordaConnection(DatabaseTransactionManager.current().connection)
|
||||
}
|
||||
|
||||
class CordaConnection(val connection: Connection) : Connection by connection {
|
||||
|
@ -8,7 +8,7 @@ import io.requery.sql.KotlinEntityDataStore
|
||||
import io.requery.sql.SchemaModifier
|
||||
import io.requery.sql.TableCreationMode
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import net.corda.node.utilities.DatabaseTransactionManager
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
@ -46,9 +46,5 @@ class RequeryConfiguration(val properties: Properties, val useDefaultLogging: Bo
|
||||
}
|
||||
|
||||
// TODO: remove once Requery supports QUERY WITH COMPOSITE_KEY IN
|
||||
fun jdbcSession(): Connection {
|
||||
val ctx = TransactionManager.manager.currentOrNull()
|
||||
return ctx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.")
|
||||
}
|
||||
fun jdbcSession(): Connection = DatabaseTransactionManager.current().connection
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,6 @@ import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.security.PublicKey
|
||||
@ -74,7 +73,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
val serverAddress: NetworkHostAndPort,
|
||||
val myIdentity: PublicKey?,
|
||||
val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
val database: Database,
|
||||
val database: CordaPersistence,
|
||||
val networkMapRegistrationFuture: ListenableFuture<Unit>,
|
||||
val monitoringService: MonitoringService,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress
|
||||
|
@ -18,11 +18,7 @@ import net.corda.core.utilities.*
|
||||
import net.corda.node.services.api.FlowAppAuditEvent
|
||||
import net.corda.node.services.api.FlowPermissionAuditEvent
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.utilities.StrandLocalTransactionManager
|
||||
import net.corda.node.utilities.createTransaction
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import net.corda.node.utilities.*
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.sql.Connection
|
||||
@ -53,23 +49,23 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
@Suspendable
|
||||
inline fun sleep(millis: Long) {
|
||||
if (currentStateMachine() != null) {
|
||||
val db = StrandLocalTransactionManager.database
|
||||
TransactionManager.current().commit()
|
||||
TransactionManager.current().close()
|
||||
val db = DatabaseTransactionManager.dataSource
|
||||
DatabaseTransactionManager.current().commit()
|
||||
DatabaseTransactionManager.current().close()
|
||||
Strand.sleep(millis)
|
||||
StrandLocalTransactionManager.database = db
|
||||
TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
DatabaseTransactionManager.dataSource = db
|
||||
DatabaseTransactionManager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
} else Strand.sleep(millis)
|
||||
}
|
||||
}
|
||||
|
||||
// These fields shouldn't be serialised, so they are marked @Transient.
|
||||
@Transient override lateinit var serviceHub: ServiceHubInternal
|
||||
@Transient internal lateinit var database: Database
|
||||
@Transient internal lateinit var database: CordaPersistence
|
||||
@Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit
|
||||
@Transient internal lateinit var actionOnEnd: (Try<R>, Boolean) -> Unit
|
||||
@Transient internal var fromCheckpoint: Boolean = false
|
||||
@Transient private var txTrampoline: Transaction? = null
|
||||
@Transient private var txTrampoline: DatabaseTransaction? = null
|
||||
|
||||
/**
|
||||
* Return the logger for this state machine. The logger name incorporates [id] and so including it in the log message
|
||||
@ -130,7 +126,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
private fun createTransaction() {
|
||||
// Make sure we have a database transaction
|
||||
database.createTransaction()
|
||||
logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}" }
|
||||
logger.trace { "Starting database transaction ${DatabaseTransactionManager.currentOrNull()} on ${Strand.currentStrand()}" }
|
||||
}
|
||||
|
||||
private fun processException(exception: Throwable, propagated: Boolean) {
|
||||
@ -140,7 +136,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
|
||||
internal fun commitTransaction() {
|
||||
val transaction = TransactionManager.current()
|
||||
val transaction = DatabaseTransactionManager.current()
|
||||
try {
|
||||
logger.trace { "Committing database transaction $transaction on ${Strand.currentStrand()}." }
|
||||
transaction.commit()
|
||||
@ -383,8 +379,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
private fun suspend(ioRequest: FlowIORequest) {
|
||||
// We have to pass the thread local database transaction across via a transient field as the fiber park
|
||||
// swaps them out.
|
||||
txTrampoline = TransactionManager.currentOrNull()
|
||||
StrandLocalTransactionManager.setThreadLocalTx(null)
|
||||
txTrampoline = DatabaseTransactionManager.setThreadLocalTx(null)
|
||||
if (ioRequest is WaitingRequest)
|
||||
waitingForResponse = ioRequest
|
||||
|
||||
@ -393,7 +388,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
logger.trace { "Suspended on $ioRequest" }
|
||||
// restore the Tx onto the ThreadLocal so that we can commit the ensuing checkpoint to the DB
|
||||
try {
|
||||
StrandLocalTransactionManager.setThreadLocalTx(txTrampoline)
|
||||
DatabaseTransactionManager.setThreadLocalTx(txTrampoline)
|
||||
txTrampoline = null
|
||||
actionOnSuspend(ioRequest)
|
||||
} catch (t: Throwable) {
|
||||
|
@ -39,7 +39,6 @@ import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.messaging.TopicSession
|
||||
import net.corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.slf4j.Logger
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -76,7 +75,7 @@ import kotlin.collections.ArrayList
|
||||
class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
val checkpointStorage: CheckpointStorage,
|
||||
val executor: AffinityExecutor,
|
||||
val database: Database,
|
||||
val database: CordaPersistence,
|
||||
private val unfinishedFibers: ReusableLatch = ReusableLatch()) {
|
||||
|
||||
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
|
||||
|
@ -36,7 +36,6 @@ import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.transactions.BFTSMaRt.Client
|
||||
import net.corda.node.services.transactions.BFTSMaRt.Replica
|
||||
import net.corda.node.utilities.JDBCHashMap
|
||||
import net.corda.node.utilities.transaction
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
|
||||
|
@ -8,9 +8,8 @@ import io.atomix.copycat.server.StateMachine
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.JDBCHashMap
|
||||
import net.corda.node.utilities.transaction
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
@ -21,7 +20,7 @@ import java.util.*
|
||||
* to disk, and sharing them across the cluster. A new node joining the cluster will have to obtain and install a snapshot
|
||||
* containing the entire JDBC table contents.
|
||||
*/
|
||||
class DistributedImmutableMap<K : Any, V : Any>(val db: Database, tableName: String) : StateMachine(), Snapshottable {
|
||||
class DistributedImmutableMap<K : Any, V : Any>(val db: CordaPersistence, tableName: String) : StateMachine(), Snapshottable {
|
||||
companion object {
|
||||
private val log = loggerFor<DistributedImmutableMap<*, *>>()
|
||||
}
|
||||
|
@ -23,8 +23,8 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -55,7 +55,7 @@ class RaftUniquenessProvider(services: ServiceHubInternal) : UniquenessProvider,
|
||||
*/
|
||||
private val clusterAddresses = services.configuration.notaryClusterAddresses
|
||||
/** The database to store the state machine state in */
|
||||
private val db: Database = services.database
|
||||
private val db: CordaPersistence = services.database
|
||||
/** SSL configuration */
|
||||
private val transportConfiguration: SSLConfiguration = services.configuration
|
||||
|
||||
|
@ -3,14 +3,13 @@ package net.corda.node.services.vault
|
||||
import com.codahale.metrics.Gauge
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.utilities.transaction
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* This class observes the vault and reflect current cash balances as exposed metrics in the monitoring service.
|
||||
*/
|
||||
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal, val database: Database) {
|
||||
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal, val database: CordaPersistence) {
|
||||
init {
|
||||
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
|
||||
serviceHubInternal.vaultService.updates.subscribe { _ ->
|
||||
|
@ -0,0 +1,193 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.io.Closeable
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
|
||||
|
||||
//HikariDataSource implements also Closeable which allows CordaPersistence to be Closeable
|
||||
class CordaPersistence(var dataSource: HikariDataSource): Closeable {
|
||||
|
||||
/** Holds Exposed database, the field will be removed once Exposed library is removed */
|
||||
lateinit var database: Database
|
||||
|
||||
companion object {
|
||||
fun connect(dataSource: HikariDataSource): CordaPersistence {
|
||||
return CordaPersistence(dataSource).apply {
|
||||
DatabaseTransactionManager(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun createTransaction(): DatabaseTransaction {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
DatabaseTransactionManager.dataSource = this
|
||||
return DatabaseTransactionManager.currentOrNew(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
}
|
||||
|
||||
fun <T> isolatedTransaction(block: DatabaseTransaction.() -> T): T {
|
||||
val context = DatabaseTransactionManager.setThreadLocalTx(null)
|
||||
return try {
|
||||
transaction(block)
|
||||
} finally {
|
||||
DatabaseTransactionManager.restoreThreadLocalTx(context)
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> transaction(statement: DatabaseTransaction.() -> T): T {
|
||||
DatabaseTransactionManager.dataSource = this
|
||||
return transaction(Connection.TRANSACTION_REPEATABLE_READ, 3, statement)
|
||||
}
|
||||
|
||||
private fun <T> transaction(transactionIsolation: Int, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
|
||||
val outer = DatabaseTransactionManager.currentOrNull()
|
||||
|
||||
return if (outer != null) {
|
||||
outer.statement()
|
||||
}
|
||||
else {
|
||||
inTopLevelTransaction(transactionIsolation, repetitionAttempts, statement)
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T> inTopLevelTransaction(transactionIsolation: Int, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
|
||||
var repetitions = 0
|
||||
while (true) {
|
||||
val transaction = DatabaseTransactionManager.currentOrNew(transactionIsolation)
|
||||
try {
|
||||
val answer = transaction.statement()
|
||||
transaction.commit()
|
||||
return answer
|
||||
}
|
||||
catch (e: SQLException) {
|
||||
transaction.rollback()
|
||||
repetitions++
|
||||
if (repetitions >= repetitionAttempts) {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
catch (e: Throwable) {
|
||||
transaction.rollback()
|
||||
throw e
|
||||
}
|
||||
finally {
|
||||
transaction.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
dataSource.close()
|
||||
}
|
||||
}
|
||||
|
||||
fun configureDatabase(props: Properties): CordaPersistence {
|
||||
val config = HikariConfig(props)
|
||||
val dataSource = HikariDataSource(config)
|
||||
val persistence = CordaPersistence.connect(dataSource)
|
||||
|
||||
//org.jetbrains.exposed.sql.Database will be removed once Exposed library is removed
|
||||
val database = Database.connect(dataSource) { _ -> ExposedTransactionManager() }
|
||||
persistence.database = database
|
||||
|
||||
// Check not in read-only mode.
|
||||
persistence.transaction {
|
||||
persistence.dataSource.connection.use {
|
||||
check(!it.metaData.isReadOnly) { "Database should not be readonly." }
|
||||
}
|
||||
}
|
||||
return persistence
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffer observations until after the current database transaction has been closed. Observations are never
|
||||
* dropped, simply delayed.
|
||||
*
|
||||
* Primarily for use by component authors to publish observations during database transactions without racing against
|
||||
* closing the database transaction.
|
||||
*
|
||||
* For examples, see the call hierarchy of this function.
|
||||
*/
|
||||
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
|
||||
val currentTxId = DatabaseTransactionManager.transactionId
|
||||
val databaseTxBoundary: Observable<DatabaseTransactionManager.Boundary> = DatabaseTransactionManager.transactionBoundaries.filter { it.txId == currentTxId }.first()
|
||||
val subject = UnicastSubject.create<T>()
|
||||
subject.delaySubscription(databaseTxBoundary).subscribe(this)
|
||||
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
|
||||
return subject
|
||||
}
|
||||
|
||||
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
|
||||
private class DatabaseTransactionWrappingSubscriber<U>(val db: CordaPersistence?) : Subscriber<U>() {
|
||||
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.
|
||||
val delegates = CopyOnWriteArrayList<Subscriber<in U>>()
|
||||
|
||||
fun forEachSubscriberWithDbTx(block: Subscriber<in U>.() -> Unit) {
|
||||
(db ?: DatabaseTransactionManager.dataSource).transaction {
|
||||
delegates.filter { !it.isUnsubscribed }.forEach {
|
||||
it.block()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onCompleted() = forEachSubscriberWithDbTx { onCompleted() }
|
||||
|
||||
override fun onError(e: Throwable?) = forEachSubscriberWithDbTx { onError(e) }
|
||||
|
||||
override fun onNext(s: U) = forEachSubscriberWithDbTx { onNext(s) }
|
||||
|
||||
override fun onStart() = forEachSubscriberWithDbTx { onStart() }
|
||||
|
||||
fun cleanUp() {
|
||||
if (delegates.removeIf { it.isUnsubscribed }) {
|
||||
if (delegates.isEmpty()) {
|
||||
unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A subscriber that wraps another but does not pass on observations to it.
|
||||
private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
|
||||
override fun onCompleted() {
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable?) {
|
||||
}
|
||||
|
||||
override fun onNext(s: U) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap delivery of observations in a database transaction. Multiple subscribers will receive the observations inside
|
||||
* the same database transaction. This also lazily subscribes to the source [rx.Observable] to preserve any buffering
|
||||
* that might be in place.
|
||||
*/
|
||||
fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: CordaPersistence? = null): rx.Observable<T> {
|
||||
var wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
|
||||
// Use lift to add subscribers to a special subscriber that wraps a database transaction around observations.
|
||||
// Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber.
|
||||
return this.lift { toBeWrappedInDbTx: Subscriber<in T> ->
|
||||
// Add the subscriber to the wrapping subscriber, which will invoke the original subscribers together inside a database transaction.
|
||||
wrappingSubscriber.delegates.add(toBeWrappedInDbTx)
|
||||
// If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing.
|
||||
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx)
|
||||
// Clean up the shared list of subscribers when they unsubscribe.
|
||||
}.doOnUnsubscribe {
|
||||
wrappingSubscriber.cleanUp()
|
||||
// If cleanup removed the last subscriber reset the system, as future subscribers might need the stream again
|
||||
if (wrappingSubscriber.delegates.isEmpty()) {
|
||||
wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,277 +1,26 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.parsePublicKeyBase58
|
||||
import net.corda.core.crypto.toBase58String
|
||||
import net.corda.node.utilities.StrandLocalTransactionManager.Boundary
|
||||
import org.bouncycastle.cert.X509CertificateHolder
|
||||
import org.h2.jdbc.JdbcBlob
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionInterface
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.subjects.Subject
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.Closeable
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.CertificateFactory
|
||||
import java.sql.Connection
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZoneOffset
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
|
||||
/**
|
||||
* Table prefix for all tables owned by the node module.
|
||||
*/
|
||||
const val NODE_DATABASE_PREFIX = "node_"
|
||||
|
||||
@Deprecated("Use Database.transaction instead.")
|
||||
fun <T> databaseTransaction(db: Database, statement: Transaction.() -> T) = db.transaction(statement)
|
||||
|
||||
// TODO: Handle commit failure due to database unavailable. Better to shutdown and await database reconnect/recovery.
|
||||
fun <T> Database.transaction(statement: Transaction.() -> T): T {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
StrandLocalTransactionManager.database = this
|
||||
return org.jetbrains.exposed.sql.transactions.transaction(Connection.TRANSACTION_REPEATABLE_READ, 1, statement)
|
||||
}
|
||||
|
||||
fun Database.createTransaction(): Transaction {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
StrandLocalTransactionManager.database = this
|
||||
return TransactionManager.currentOrNew(Connection.TRANSACTION_REPEATABLE_READ)
|
||||
}
|
||||
|
||||
fun configureDatabase(props: Properties): Pair<Closeable, Database> {
|
||||
val config = HikariConfig(props)
|
||||
val dataSource = HikariDataSource(config)
|
||||
val database = Database.connect(dataSource) { db -> StrandLocalTransactionManager(db) }
|
||||
// Check not in read-only mode.
|
||||
database.transaction {
|
||||
check(!database.metadata.isReadOnly) { "Database should not be readonly." }
|
||||
}
|
||||
return Pair(dataSource, database)
|
||||
}
|
||||
|
||||
fun <T> Database.isolatedTransaction(block: Transaction.() -> T): T {
|
||||
val oldContext = StrandLocalTransactionManager.setThreadLocalTx(null)
|
||||
return try {
|
||||
transaction(block)
|
||||
} finally {
|
||||
StrandLocalTransactionManager.restoreThreadLocalTx(oldContext)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A relatively close copy of the [org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManager]
|
||||
* in Exposed but with the following adjustments to suit our environment:
|
||||
*
|
||||
* Because the construction of a [Database] instance results in replacing the singleton [TransactionManager] instance,
|
||||
* our tests involving two [MockNode]s effectively replace the database instances of each other and continue to trample
|
||||
* over each other. So here we use a companion object to hold them as [ThreadLocal] and [StrandLocalTransactionManager]
|
||||
* is otherwise effectively stateless so it's replacement does not matter. The [ThreadLocal] is then set correctly and
|
||||
* explicitly just prior to initiating a transaction in [transaction] and [createTransaction] above.
|
||||
*
|
||||
* The [StrandLocalTransactionManager] instances have an [Observable] of the transaction close [Boundary]s which
|
||||
* facilitates the use of [Observable.afterDatabaseCommit] to create event streams that only emit once the database
|
||||
* transaction is closed and the data has been persisted and becomes visible to other observers.
|
||||
*/
|
||||
class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionManager {
|
||||
|
||||
companion object {
|
||||
private val TX_ID = Key<UUID>()
|
||||
|
||||
private val threadLocalDb = ThreadLocal<Database>()
|
||||
private val threadLocalTx = ThreadLocal<Transaction>()
|
||||
private val databaseToInstance = ConcurrentHashMap<Database, StrandLocalTransactionManager>()
|
||||
|
||||
fun setThreadLocalTx(tx: Transaction?): Pair<Database?, Transaction?> {
|
||||
val oldTx = threadLocalTx.get()
|
||||
threadLocalTx.set(tx)
|
||||
return Pair(threadLocalDb.get(), oldTx)
|
||||
}
|
||||
|
||||
fun restoreThreadLocalTx(context: Pair<Database?, Transaction?>) {
|
||||
threadLocalDb.set(context.first)
|
||||
threadLocalTx.set(context.second)
|
||||
}
|
||||
|
||||
var database: Database
|
||||
get() = threadLocalDb.get() ?: throw IllegalStateException("Was expecting to find database set on current strand: ${Strand.currentStrand()}")
|
||||
set(value) {
|
||||
threadLocalDb.set(value)
|
||||
}
|
||||
|
||||
val transactionId: UUID
|
||||
get() = threadLocalTx.get()?.getUserData(TX_ID) ?: throw IllegalStateException("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
|
||||
|
||||
val manager: StrandLocalTransactionManager get() = databaseToInstance[database]!!
|
||||
|
||||
val transactionBoundaries: Subject<Boundary, Boundary> get() = manager._transactionBoundaries
|
||||
}
|
||||
|
||||
|
||||
data class Boundary(val txId: UUID)
|
||||
|
||||
private val _transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
|
||||
|
||||
init {
|
||||
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
||||
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
||||
// databae transaction open. The [transaction] helper above handles this in a finally clause for you
|
||||
// but any manual database transaction management is liable to have this problem.
|
||||
if (threadLocalTx.get() != null) {
|
||||
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
|
||||
}
|
||||
database = initWithDatabase
|
||||
databaseToInstance[database] = this
|
||||
}
|
||||
|
||||
override fun newTransaction(isolation: Int): Transaction {
|
||||
val impl = StrandLocalTransaction(database, isolation, threadLocalTx, transactionBoundaries)
|
||||
return Transaction(impl).apply {
|
||||
threadLocalTx.set(this)
|
||||
putUserData(TX_ID, impl.id)
|
||||
}
|
||||
}
|
||||
|
||||
override fun currentOrNull(): Transaction? = threadLocalTx.get()
|
||||
|
||||
// Direct copy of [ThreadLocalTransaction].
|
||||
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>, val transactionBoundaries: Subject<Boundary, Boundary>) : TransactionInterface {
|
||||
val id = UUID.randomUUID()
|
||||
|
||||
override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
db.connector().apply {
|
||||
autoCommit = false
|
||||
transactionIsolation = isolation
|
||||
}
|
||||
}
|
||||
|
||||
override val outerTransaction = threadLocal.get()
|
||||
|
||||
override fun commit() {
|
||||
connection.commit()
|
||||
}
|
||||
|
||||
override fun rollback() {
|
||||
if (!connection.isClosed) {
|
||||
connection.rollback()
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
connection.close()
|
||||
threadLocal.set(outerTransaction)
|
||||
if (outerTransaction == null) {
|
||||
transactionBoundaries.onNext(Boundary(id))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffer observations until after the current database transaction has been closed. Observations are never
|
||||
* dropped, simply delayed.
|
||||
*
|
||||
* Primarily for use by component authors to publish observations during database transactions without racing against
|
||||
* closing the database transaction.
|
||||
*
|
||||
* For examples, see the call hierarchy of this function.
|
||||
*/
|
||||
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
|
||||
val currentTxId = StrandLocalTransactionManager.transactionId
|
||||
val databaseTxBoundary: Observable<StrandLocalTransactionManager.Boundary> = StrandLocalTransactionManager.transactionBoundaries.filter { it.txId == currentTxId }.first()
|
||||
val subject = UnicastSubject.create<T>()
|
||||
subject.delaySubscription(databaseTxBoundary).subscribe(this)
|
||||
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
|
||||
return subject
|
||||
}
|
||||
|
||||
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
|
||||
private class DatabaseTransactionWrappingSubscriber<U>(val db: Database?) : Subscriber<U>() {
|
||||
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.
|
||||
val delegates = CopyOnWriteArrayList<Subscriber<in U>>()
|
||||
|
||||
fun forEachSubscriberWithDbTx(block: Subscriber<in U>.() -> Unit) {
|
||||
(db ?: StrandLocalTransactionManager.database).transaction {
|
||||
delegates.filter { !it.isUnsubscribed }.forEach {
|
||||
it.block()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
forEachSubscriberWithDbTx { onCompleted() }
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable?) {
|
||||
forEachSubscriberWithDbTx { onError(e) }
|
||||
}
|
||||
|
||||
override fun onNext(s: U) {
|
||||
forEachSubscriberWithDbTx { onNext(s) }
|
||||
}
|
||||
|
||||
override fun onStart() {
|
||||
forEachSubscriberWithDbTx { onStart() }
|
||||
}
|
||||
|
||||
fun cleanUp() {
|
||||
if (delegates.removeIf { it.isUnsubscribed }) {
|
||||
if (delegates.isEmpty()) {
|
||||
unsubscribe()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A subscriber that wraps another but does not pass on observations to it.
|
||||
private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
|
||||
override fun onCompleted() {
|
||||
}
|
||||
|
||||
override fun onError(e: Throwable?) {
|
||||
}
|
||||
|
||||
override fun onNext(s: U) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap delivery of observations in a database transaction. Multiple subscribers will receive the observations inside
|
||||
* the same database transaction. This also lazily subscribes to the source [rx.Observable] to preserve any buffering
|
||||
* that might be in place.
|
||||
*/
|
||||
fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null): rx.Observable<T> {
|
||||
var wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
|
||||
// Use lift to add subscribers to a special subscriber that wraps a database transaction around observations.
|
||||
// Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber.
|
||||
return this.lift { toBeWrappedInDbTx: Subscriber<in T> ->
|
||||
// Add the subscriber to the wrapping subscriber, which will invoke the original subscribers together inside a database transaction.
|
||||
wrappingSubscriber.delegates.add(toBeWrappedInDbTx)
|
||||
// If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing.
|
||||
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx)
|
||||
// Clean up the shared list of subscribers when they unsubscribe.
|
||||
}.doOnUnsubscribe {
|
||||
wrappingSubscriber.cleanUp()
|
||||
// If cleanup removed the last subscriber reset the system, as future subscribers might need the stream again
|
||||
if (wrappingSubscriber.delegates.isEmpty()) {
|
||||
wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Composite columns for use with below Exposed helpers.
|
||||
data class PartyColumns(val name: Column<String>, val owningKey: Column<PublicKey>)
|
||||
data class PartyAndCertificateColumns(val name: Column<String>, val owningKey: Column<PublicKey>,
|
||||
|
@ -0,0 +1,106 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import rx.subjects.PublishSubject
|
||||
import rx.subjects.Subject
|
||||
import java.sql.Connection
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class DatabaseTransaction(isolation: Int, val threadLocal: ThreadLocal<DatabaseTransaction>,
|
||||
val transactionBoundaries: Subject<DatabaseTransactionManager.Boundary, DatabaseTransactionManager.Boundary>,
|
||||
val cordaPersistence: CordaPersistence) {
|
||||
|
||||
val id: UUID = UUID.randomUUID()
|
||||
|
||||
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
cordaPersistence.dataSource.connection
|
||||
.apply {
|
||||
autoCommit = false
|
||||
transactionIsolation = isolation
|
||||
}
|
||||
}
|
||||
|
||||
val outerTransaction: DatabaseTransaction? = threadLocal.get()
|
||||
|
||||
fun commit() {
|
||||
connection.commit()
|
||||
}
|
||||
|
||||
fun rollback() {
|
||||
if (!connection.isClosed) {
|
||||
connection.rollback()
|
||||
}
|
||||
}
|
||||
|
||||
fun close() {
|
||||
connection.close()
|
||||
threadLocal.set(outerTransaction)
|
||||
if (outerTransaction == null) {
|
||||
transactionBoundaries.onNext(DatabaseTransactionManager.Boundary(id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DatabaseTransactionManager(initDataSource: CordaPersistence) {
|
||||
companion object {
|
||||
private val threadLocalDb = ThreadLocal<CordaPersistence>()
|
||||
private val threadLocalTx = ThreadLocal<DatabaseTransaction>()
|
||||
private val databaseToInstance = ConcurrentHashMap<CordaPersistence, DatabaseTransactionManager>()
|
||||
|
||||
fun setThreadLocalTx(tx: DatabaseTransaction?): DatabaseTransaction? {
|
||||
val oldTx = threadLocalTx.get()
|
||||
threadLocalTx.set(tx)
|
||||
return oldTx
|
||||
}
|
||||
|
||||
fun restoreThreadLocalTx(context: DatabaseTransaction?) {
|
||||
if (context != null) {
|
||||
threadLocalDb.set(context.cordaPersistence)
|
||||
}
|
||||
threadLocalTx.set(context)
|
||||
}
|
||||
|
||||
var dataSource: CordaPersistence
|
||||
get() = threadLocalDb.get() ?: throw IllegalStateException("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
|
||||
set(value) = threadLocalDb.set(value)
|
||||
|
||||
val transactionId: UUID
|
||||
get() = threadLocalTx.get()?.id ?: throw IllegalStateException("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
|
||||
|
||||
val manager: DatabaseTransactionManager get() = databaseToInstance[dataSource]!!
|
||||
|
||||
val transactionBoundaries: Subject<Boundary, Boundary> get() = manager._transactionBoundaries
|
||||
|
||||
fun currentOrNull(): DatabaseTransaction? = manager.currentOrNull()
|
||||
|
||||
fun currentOrNew(isolation: Int) = currentOrNull() ?: manager.newTransaction(isolation)
|
||||
|
||||
fun current(): DatabaseTransaction = currentOrNull() ?: error("No transaction in context.")
|
||||
|
||||
fun newTransaction(isolation: Int) = manager.newTransaction(isolation)
|
||||
}
|
||||
|
||||
data class Boundary(val txId: UUID)
|
||||
|
||||
private val _transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
|
||||
|
||||
init {
|
||||
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
|
||||
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
|
||||
// database transaction open. The [transaction] helper above handles this in a finally clause for you
|
||||
// but any manual database transaction management is liable to have this problem.
|
||||
if (threadLocalTx.get() != null) {
|
||||
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
|
||||
}
|
||||
dataSource = initDataSource
|
||||
databaseToInstance[dataSource] = this
|
||||
}
|
||||
|
||||
private fun newTransaction(isolation: Int) =
|
||||
DatabaseTransaction(isolation, threadLocalTx, transactionBoundaries, dataSource).apply {
|
||||
threadLocalTx.set(this)
|
||||
}
|
||||
|
||||
private fun currentOrNull(): DatabaseTransaction? = threadLocalTx.get()
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionInterface
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Connection
|
||||
|
||||
/**
|
||||
* Wrapper of [DatabaseTransaction], because the class is effectively used for [ExposedTransaction.connection] method only not all methods are implemented.
|
||||
* The class will obsolete when Exposed library is phased out.
|
||||
*/
|
||||
class ExposedTransaction(override val db: Database, val databaseTransaction: DatabaseTransaction): TransactionInterface {
|
||||
|
||||
override val outerTransaction: Transaction?
|
||||
get() = throw UnsupportedOperationException()
|
||||
|
||||
override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
databaseTransaction.connection
|
||||
}
|
||||
|
||||
override fun commit() {
|
||||
databaseTransaction.commit()
|
||||
}
|
||||
|
||||
override fun rollback() {
|
||||
databaseTransaction.rollback()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
databaseTransaction.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates methods to [DatabaseTransactionManager].
|
||||
* The class will obsolete when Exposed library is phased out.
|
||||
*/
|
||||
class ExposedTransactionManager: TransactionManager {
|
||||
companion object {
|
||||
val database: Database
|
||||
get() = DatabaseTransactionManager.dataSource.database
|
||||
}
|
||||
|
||||
override fun newTransaction(isolation: Int): Transaction {
|
||||
var databaseTransaction = DatabaseTransactionManager.newTransaction(isolation)
|
||||
return Transaction(ExposedTransaction(database, databaseTransaction))
|
||||
}
|
||||
|
||||
override fun currentOrNull(): Transaction? {
|
||||
val databaseTransaction = DatabaseTransactionManager.currentOrNull()
|
||||
return if (databaseTransaction != null) Transaction(ExposedTransaction(database, databaseTransaction)) else null
|
||||
}
|
||||
}
|
@ -8,7 +8,6 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.sql.Blob
|
||||
import java.util.*
|
||||
import kotlin.system.measureTimeMillis
|
||||
@ -60,7 +59,7 @@ class JDBCHashMap<K : Any, V : Any>(tableName: String,
|
||||
}
|
||||
|
||||
fun bytesToBlob(value: SerializedBytes<*>, finalizables: MutableList<() -> Unit>): Blob {
|
||||
val blob = TransactionManager.current().connection.createBlob()
|
||||
val blob = DatabaseTransactionManager.current().connection.createBlob()
|
||||
finalizables += { blob.free() }
|
||||
blob.setBytes(1, value.bytes)
|
||||
return blob
|
||||
|
@ -11,9 +11,11 @@ import net.corda.core.messaging.*;
|
||||
import net.corda.core.node.services.*;
|
||||
import net.corda.core.node.services.vault.*;
|
||||
import net.corda.core.node.services.vault.QueryCriteria.*;
|
||||
import net.corda.testing.contracts.DummyLinearContract;
|
||||
import net.corda.core.schemas.*;
|
||||
import net.corda.core.transactions.*;
|
||||
import net.corda.core.utilities.*;
|
||||
import net.corda.node.utilities.CordaPersistence;
|
||||
import net.corda.node.services.database.*;
|
||||
import net.corda.node.services.schema.*;
|
||||
import net.corda.schemas.*;
|
||||
@ -22,7 +24,6 @@ import net.corda.testing.contracts.*;
|
||||
import net.corda.testing.node.*;
|
||||
import net.corda.testing.schemas.DummyLinearStateSchemaV1;
|
||||
import org.jetbrains.annotations.*;
|
||||
import org.jetbrains.exposed.sql.*;
|
||||
import org.junit.*;
|
||||
import rx.Observable;
|
||||
|
||||
@ -34,8 +35,7 @@ import java.util.stream.*;
|
||||
import static net.corda.contracts.asset.CashKt.*;
|
||||
import static net.corda.core.contracts.ContractsDSL.*;
|
||||
import static net.corda.core.node.services.vault.QueryCriteriaUtils.*;
|
||||
import static net.corda.node.utilities.DatabaseSupportKt.*;
|
||||
import static net.corda.node.utilities.DatabaseSupportKt.transaction;
|
||||
import static net.corda.node.utilities.CordaPersistenceKt.configureDatabase;
|
||||
import static net.corda.testing.CoreTestUtils.*;
|
||||
import static net.corda.testing.node.MockServicesKt.*;
|
||||
import static net.corda.core.utilities.ByteArrays.toHexString;
|
||||
@ -46,19 +46,16 @@ public class VaultQueryJavaTests {
|
||||
private MockServices services;
|
||||
private VaultService vaultSvc;
|
||||
private VaultQueryService vaultQuerySvc;
|
||||
private Closeable dataSource;
|
||||
private Database database;
|
||||
private CordaPersistence database;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
Properties dataSourceProps = makeTestDataSourceProperties(SecureHash.randomSHA256().toString());
|
||||
Pair<Closeable, Database> dataSourceAndDatabase = configureDatabase(dataSourceProps);
|
||||
dataSource = dataSourceAndDatabase.getFirst();
|
||||
database = dataSourceAndDatabase.getSecond();
|
||||
database = configureDatabase(dataSourceProps);
|
||||
|
||||
Set<MappedSchema> customSchemas = new HashSet<>(Collections.singletonList(DummyLinearStateSchemaV1.INSTANCE));
|
||||
HibernateConfiguration hibernateConfig = new HibernateConfiguration(new NodeSchemaService(customSchemas));
|
||||
transaction(database,
|
||||
database.transaction(
|
||||
statement -> { services = new MockServices(getMEGA_CORP_KEY()) {
|
||||
@NotNull
|
||||
@Override
|
||||
@ -91,7 +88,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@After
|
||||
public void cleanUp() throws IOException {
|
||||
dataSource.close();
|
||||
database.close();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -104,7 +101,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void unconsumedLinearStates() throws VaultQueryException {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
VaultFiller.fillWithSomeTestLinearStates(services, 3);
|
||||
|
||||
@ -120,7 +117,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void unconsumedStatesForStateRefsSortedByTxnId() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
VaultFiller.fillWithSomeTestLinearStates(services, 8);
|
||||
Vault<LinearState> issuedStates = VaultFiller.fillWithSomeTestLinearStates(services, 2);
|
||||
@ -145,7 +142,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void consumedCashStates() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Amount<Currency> amount = new Amount<>(100, Currency.getInstance("USD"));
|
||||
|
||||
@ -175,7 +172,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void consumedDealStatesPagedSorted() throws VaultQueryException {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Vault<LinearState> states = VaultFiller.fillWithSomeTestLinearStates(services, 10, null);
|
||||
StateAndRef<LinearState> linearState = states.getStates().iterator().next();
|
||||
@ -217,7 +214,7 @@ public class VaultQueryJavaTests {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void customQueryForCashStatesWithAmountOfCurrencyGreaterOrEqualThanQuantity() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Amount<Currency> pounds = new Amount<>(100, Currency.getInstance("GBP"));
|
||||
Amount<Currency> dollars100 = new Amount<>(100, Currency.getInstance("USD"));
|
||||
@ -261,7 +258,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void trackCashStates() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
VaultFiller.fillWithSomeTestCash(services,
|
||||
new Amount<>(100, Currency.getInstance("USD")),
|
||||
TestConstants.getDUMMY_NOTARY(),
|
||||
@ -292,7 +289,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void trackDealStatesPagedSorted() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Vault<LinearState> states = VaultFiller.fillWithSomeTestLinearStates(services, 10, null);
|
||||
UniqueIdentifier uid = states.getStates().iterator().next().component1().getData().getLinearId();
|
||||
@ -333,7 +330,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void consumedStatesDeprecated() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
Amount<Currency> amount = new Amount<>(100, USD);
|
||||
VaultFiller.fillWithSomeTestCash(services,
|
||||
new Amount<>(100, USD),
|
||||
@ -365,7 +362,7 @@ public class VaultQueryJavaTests {
|
||||
|
||||
@Test
|
||||
public void consumedStatesForLinearIdDeprecated() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Vault<LinearState> linearStates = VaultFiller.fillWithSomeTestLinearStates(services, 4,null);
|
||||
linearStates.getStates().iterator().next().component1().getData().getLinearId();
|
||||
@ -394,7 +391,7 @@ public class VaultQueryJavaTests {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void aggregateFunctionsWithoutGroupClause() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Amount<Currency> dollars100 = new Amount<>(100, Currency.getInstance("USD"));
|
||||
Amount<Currency> dollars200 = new Amount<>(200, Currency.getInstance("USD"));
|
||||
@ -439,7 +436,7 @@ public class VaultQueryJavaTests {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void aggregateFunctionsWithSingleGroupClause() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Amount<Currency> dollars100 = new Amount<>(100, Currency.getInstance("USD"));
|
||||
Amount<Currency> dollars200 = new Amount<>(200, Currency.getInstance("USD"));
|
||||
@ -510,7 +507,7 @@ public class VaultQueryJavaTests {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void aggregateFunctionsSumByIssuerAndCurrencyAndSortByAggregateSum() {
|
||||
transaction(database, tx -> {
|
||||
database.transaction(tx -> {
|
||||
|
||||
Amount<Currency> dollars100 = new Amount<>(100, Currency.getInstance("USD"));
|
||||
Amount<Currency> dollars200 = new Amount<>(200, Currency.getInstance("USD"));
|
||||
|
@ -22,7 +22,6 @@ import net.corda.node.services.messaging.RpcContext
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.startFlowPermission
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.nodeapi.PermissionException
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.testing.expect
|
||||
|
@ -13,16 +13,13 @@ import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.Closeable
|
||||
import java.math.BigInteger
|
||||
import java.security.KeyPair
|
||||
import java.util.jar.JarOutputStream
|
||||
@ -32,8 +29,6 @@ import kotlin.test.assertFailsWith
|
||||
|
||||
class AttachmentTests {
|
||||
lateinit var mockNet: MockNetwork
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var configuration: RequeryConfiguration
|
||||
|
||||
@Before
|
||||
|
@ -35,14 +35,13 @@ import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.persistence.checkpoints
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.contracts.fillWithSomeTestCash
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -681,7 +680,7 @@ class TwoPartyTradeFlowTests {
|
||||
}
|
||||
|
||||
|
||||
class RecordingTransactionStorage(val database: Database, val delegate: WritableTransactionStorage) : WritableTransactionStorage {
|
||||
class RecordingTransactionStorage(val database: CordaPersistence, val delegate: WritableTransactionStorage) : WritableTransactionStorage {
|
||||
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
|
||||
return database.transaction {
|
||||
delegate.track()
|
||||
|
@ -15,16 +15,16 @@ import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.testing.MOCK_IDENTITY_SERVICE
|
||||
import net.corda.testing.node.MockAttachmentStorage
|
||||
import net.corda.testing.node.MockNetworkMapCache
|
||||
import net.corda.testing.node.MockStateMachineRecordedTransactionMappingStorage
|
||||
import net.corda.testing.node.MockTransactionStorage
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.time.Clock
|
||||
|
||||
open class MockServiceHubInternal(
|
||||
override val database: Database,
|
||||
override val database: CordaPersistence,
|
||||
override val configuration: NodeConfiguration,
|
||||
val customVault: VaultService? = null,
|
||||
val customVaultQuery: VaultQueryService? = null,
|
||||
|
@ -14,7 +14,6 @@ import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.testing.schemas.DummyLinearStateSchemaV1
|
||||
import net.corda.testing.schemas.DummyLinearStateSchemaV2
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.storageKryo
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.testing.ALICE
|
||||
@ -25,9 +24,10 @@ import net.corda.node.services.schema.HibernateObserver
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.core.schemas.CommonSchemaV1
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.schemas.CashSchemaV1
|
||||
import net.corda.schemas.SampleCashSchemaV2
|
||||
import net.corda.schemas.SampleCashSchemaV3
|
||||
@ -39,11 +39,9 @@ import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.hibernate.SessionFactory
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.persistence.EntityManager
|
||||
@ -53,8 +51,7 @@ import javax.persistence.criteria.CriteriaBuilder
|
||||
class HibernateConfigurationTest {
|
||||
|
||||
lateinit var services: MockServices
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
val vault: VaultService get() = services.vaultService
|
||||
|
||||
// Hibernate configuration objects
|
||||
@ -70,11 +67,9 @@ class HibernateConfigurationTest {
|
||||
@Before
|
||||
fun setUp() {
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
database = configureDatabase(dataSourceProps)
|
||||
val customSchemas = setOf(VaultSchemaV1, CashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3)
|
||||
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database.transaction {
|
||||
|
||||
hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas))
|
||||
@ -104,7 +99,7 @@ class HibernateConfigurationTest {
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
private fun setUpDb() {
|
||||
|
@ -23,40 +23,35 @@ import net.corda.node.services.vault.schemas.requery.Models
|
||||
import net.corda.node.services.vault.schemas.requery.VaultCashBalancesEntity
|
||||
import net.corda.node.services.vault.schemas.requery.VaultSchema
|
||||
import net.corda.node.services.vault.schemas.requery.VaultStatesEntity
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
class RequeryConfigurationTest {
|
||||
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var transactionStorage: DBTransactionStorage
|
||||
lateinit var requerySession: KotlinEntityDataStore<Persistable>
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProperties)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(dataSourceProperties)
|
||||
newTransactionStorage()
|
||||
newRequeryStorage(dataSourceProperties)
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -18,9 +18,7 @@ import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.testing.getTestX509Name
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.MockKeyManagementService
|
||||
@ -29,11 +27,9 @@ import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.testNodeConfiguration
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.nio.file.Paths
|
||||
import java.security.PublicKey
|
||||
import java.time.Clock
|
||||
@ -54,8 +50,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
|
||||
lateinit var scheduler: NodeSchedulerService
|
||||
lateinit var smmExecutor: AffinityExecutor.ServiceAffinityExecutor
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var countDown: CountDownLatch
|
||||
lateinit var smmHasRemovedAllFlows: CountDownLatch
|
||||
|
||||
@ -76,9 +71,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
smmHasRemovedAllFlows = CountDownLatch(1)
|
||||
calls = 0
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(dataSourceProps)
|
||||
val identityService = InMemoryIdentityService(trustRoot = DUMMY_CA.certificate)
|
||||
val kms = MockKeyManagementService(identityService, ALICE_KEY)
|
||||
|
||||
@ -120,7 +113,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
}
|
||||
smmExecutor.shutdown()
|
||||
smmExecutor.awaitTermination(60, TimeUnit.SECONDS)
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant) : LinearState, SchedulableState {
|
||||
|
@ -11,7 +11,6 @@ import net.corda.core.node.services.linearHeadsOfType
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.node.MockNetwork
|
||||
|
@ -19,8 +19,8 @@ import net.corda.node.services.network.InMemoryNetworkMapCache
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import net.corda.testing.freePort
|
||||
import net.corda.testing.node.MOCK_VERSION_INFO
|
||||
@ -28,13 +28,11 @@ import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import net.corda.testing.testNodeConfiguration
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.io.Closeable
|
||||
import java.net.ServerSocket
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
@ -52,8 +50,7 @@ class ArtemisMessagingTests {
|
||||
val identity = generateKeyPair()
|
||||
|
||||
lateinit var config: NodeConfiguration
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var userService: RPCUserService
|
||||
lateinit var networkMapRegistrationFuture: ListenableFuture<Unit>
|
||||
|
||||
@ -75,9 +72,7 @@ class ArtemisMessagingTests {
|
||||
baseDirectory = baseDirectory,
|
||||
myLegalName = ALICE.name)
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
networkMapRegistrationFuture = Futures.immediateFuture(Unit)
|
||||
}
|
||||
|
||||
@ -87,7 +82,7 @@ class ArtemisMessagingTests {
|
||||
messagingServer?.stop()
|
||||
messagingClient = null
|
||||
messagingServer = null
|
||||
dataSource.close()
|
||||
database.close()
|
||||
LogHelper.reset(PersistentUniquenessProvider::class)
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,6 @@ import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.testing.ALICE
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
|
@ -4,7 +4,6 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNetwork.MockNode
|
||||
import java.math.BigInteger
|
||||
|
@ -6,16 +6,14 @@ import net.corda.testing.LogHelper
|
||||
import net.corda.node.services.api.Checkpoint
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
|
||||
internal fun CheckpointStorage.checkpoints(): List<Checkpoint> {
|
||||
val checkpoints = mutableListOf<Checkpoint>()
|
||||
@ -28,21 +26,18 @@ internal fun CheckpointStorage.checkpoints(): List<Checkpoint> {
|
||||
|
||||
class DBCheckpointStorageTests {
|
||||
lateinit var checkpointStorage: DBCheckpointStorage
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
newCheckpointStorage()
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
LogHelper.reset(PersistentUniquenessProvider::class)
|
||||
}
|
||||
|
||||
|
@ -11,35 +11,30 @@ import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class DBTransactionStorageTests {
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var transactionStorage: DBTransactionStorage
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
newTransactionStorage()
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
LogHelper.reset(PersistentUniquenessProvider::class)
|
||||
}
|
||||
|
||||
|
@ -14,7 +14,6 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.services.NotifyTransactionHandler
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.node.MockNetwork
|
||||
|
@ -13,14 +13,12 @@ import net.corda.core.writeLines
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.nio.charset.Charset
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.FileSystem
|
||||
@ -35,8 +33,7 @@ import kotlin.test.assertNull
|
||||
class NodeAttachmentStorageTest {
|
||||
// Use an in memory file system for testing attachment storage.
|
||||
lateinit var fs: FileSystem
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
lateinit var dataSourceProperties: Properties
|
||||
lateinit var configuration: RequeryConfiguration
|
||||
|
||||
@ -45,9 +42,7 @@ class NodeAttachmentStorageTest {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
|
||||
dataSourceProperties = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProperties)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(dataSourceProperties)
|
||||
|
||||
configuration = RequeryConfiguration(dataSourceProperties)
|
||||
fs = Jimfs.newFileSystem(Configuration.unix())
|
||||
@ -55,7 +50,7 @@ class NodeAttachmentStorageTest {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -10,38 +10,33 @@ import net.corda.core.schemas.QueryableState
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.hibernate.annotations.Cascade
|
||||
import org.hibernate.annotations.CascadeType
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.Closeable
|
||||
import javax.persistence.*
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
|
||||
class HibernateObserverTests {
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(HibernateObserver::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
LogHelper.reset(HibernateObserver::class)
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,6 @@ import net.corda.flows.CashPaymentFlow
|
||||
import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.services.persistence.checkpoints
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.contracts.DummyState
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
@ -90,6 +89,7 @@ class FlowFrameworkTests {
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
sessionTransfers.clear()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -10,15 +10,14 @@ import net.corda.core.getOrThrow
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
@ -27,17 +26,14 @@ class DistributedImmutableMapTests {
|
||||
data class Member(val client: CopycatClient, val server: CopycatServer)
|
||||
|
||||
lateinit var cluster: List<Member>
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var transaction: Transaction
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel(NetworkMapService::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
cluster = setUpCluster()
|
||||
}
|
||||
|
||||
@ -49,7 +45,7 @@ class DistributedImmutableMapTests {
|
||||
it.client.close()
|
||||
it.server.shutdown()
|
||||
}
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2,17 +2,15 @@ package net.corda.node.services.transactions
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.services.UniquenessException
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.generateStateRef
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
@ -20,20 +18,17 @@ class PersistentUniquenessProviderTests {
|
||||
val identity = MEGA_CORP
|
||||
val txID = SecureHash.randomSHA256()
|
||||
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
LogHelper.reset(PersistentUniquenessProvider::class)
|
||||
}
|
||||
|
||||
|
@ -14,19 +14,17 @@ import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.contracts.fillWithSomeTestCash
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executors
|
||||
@ -38,16 +36,13 @@ import kotlin.test.assertTrue
|
||||
class NodeVaultServiceTest {
|
||||
lateinit var services: MockServices
|
||||
val vaultSvc: VaultService get() = services.vaultService
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(NodeVaultService::class)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(dataSourceProps)
|
||||
database.transaction {
|
||||
services = object : MockServices() {
|
||||
override val vaultService: VaultService = makeVaultService(dataSourceProps)
|
||||
@ -65,7 +60,7 @@ class NodeVaultServiceTest {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
LogHelper.reset(NodeVaultService::class)
|
||||
}
|
||||
|
||||
|
@ -20,8 +20,8 @@ import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.schemas.CashSchemaV1
|
||||
import net.corda.schemas.CashSchemaV1.PersistentCashState
|
||||
import net.corda.schemas.CommercialPaperSchemaV1
|
||||
@ -35,10 +35,8 @@ import org.assertj.core.api.Assertions
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.*
|
||||
import org.junit.rules.ExpectedException
|
||||
import java.io.Closeable
|
||||
import java.lang.Thread.sleep
|
||||
import java.math.BigInteger
|
||||
import java.security.KeyPair
|
||||
@ -53,15 +51,12 @@ class VaultQueryTests {
|
||||
lateinit var services: MockServices
|
||||
val vaultSvc: VaultService get() = services.vaultService
|
||||
val vaultQuerySvc: VaultQueryService get() = services.vaultQueryService
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(dataSourceProps)
|
||||
database.transaction {
|
||||
val customSchemas = setOf(CommercialPaperSchemaV1, DummyLinearStateSchemaV1)
|
||||
val hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas))
|
||||
@ -82,7 +77,7 @@ class VaultQueryTests {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
/**
|
||||
@ -91,16 +86,14 @@ class VaultQueryTests {
|
||||
@Ignore
|
||||
@Test
|
||||
fun createPersistentTestDb() {
|
||||
val dataSourceAndDatabase = configureDatabase(makePersistentDataSourceProperties())
|
||||
val dataSource = dataSourceAndDatabase.first
|
||||
val database = dataSourceAndDatabase.second
|
||||
val database = configureDatabase(makePersistentDataSourceProperties())
|
||||
|
||||
setUpDb(database, 5000)
|
||||
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
private fun setUpDb(_database: Database, delay: Long = 0) {
|
||||
private fun setUpDb(_database: CordaPersistence, delay: Long = 0) {
|
||||
|
||||
_database.transaction {
|
||||
|
||||
|
@ -13,23 +13,21 @@ import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.consumedStates
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.BOB
|
||||
import net.corda.testing.DUMMY_NOTARY
|
||||
import net.corda.testing.DUMMY_NOTARY_KEY
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.MEGA_CORP_KEY
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executors
|
||||
@ -41,17 +39,14 @@ import kotlin.test.assertNull
|
||||
class VaultWithCashTest {
|
||||
lateinit var services: MockServices
|
||||
val vault: VaultService get() = services.vaultService
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
val notaryServices = MockServices(DUMMY_NOTARY_KEY)
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
LogHelper.setLevel(VaultWithCashTest::class)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val dataSourceAndDatabase = configureDatabase(dataSourceProps)
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(dataSourceProps)
|
||||
database.transaction {
|
||||
services = object : MockServices() {
|
||||
override val vaultService: VaultService = makeVaultService(dataSourceProps)
|
||||
@ -70,7 +65,7 @@ class VaultWithCashTest {
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset(VaultWithCashTest::class)
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -5,8 +5,6 @@ import net.corda.core.bufferUntilSubscribed
|
||||
import net.corda.core.tee
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
@ -16,13 +14,13 @@ import java.util.*
|
||||
|
||||
class ObservablesTests {
|
||||
|
||||
private fun isInDatabaseTransaction(): Boolean = (TransactionManager.currentOrNull() != null)
|
||||
private fun isInDatabaseTransaction(): Boolean = (DatabaseTransactionManager.currentOrNull() != null)
|
||||
|
||||
val toBeClosed = mutableListOf<Closeable>()
|
||||
|
||||
fun createDatabase(): Database {
|
||||
val (closeable, database) = configureDatabase(makeTestDataSourceProperties())
|
||||
toBeClosed += closeable
|
||||
fun createDatabase(): CordaPersistence {
|
||||
val database = configureDatabase(makeTestDataSourceProperties())
|
||||
toBeClosed += database
|
||||
return database
|
||||
}
|
||||
|
||||
@ -167,7 +165,7 @@ class ObservablesTests {
|
||||
observableWithDbTx.first().subscribe { undelayedEvent.set(it to isInDatabaseTransaction()) }
|
||||
|
||||
fun observeSecondEvent(event: Int, future: SettableFuture<Pair<Int, UUID?>>) {
|
||||
future.set(event to if (isInDatabaseTransaction()) StrandLocalTransactionManager.transactionId else null)
|
||||
future.set(event to if (isInDatabaseTransaction()) DatabaseTransactionManager.transactionId else null)
|
||||
}
|
||||
|
||||
observableWithDbTx.skip(1).first().subscribe { observeSecondEvent(it, delayedEventFromSecondObserver) }
|
||||
|
@ -17,19 +17,17 @@ import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.irs.flows.RatesFixFlow
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.junit.After
|
||||
import org.junit.Assert
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.io.Closeable
|
||||
import java.math.BigDecimal
|
||||
import java.util.function.Predicate
|
||||
import kotlin.test.assertEquals
|
||||
@ -50,8 +48,7 @@ class NodeInterestRatesTest {
|
||||
val DUMMY_CASH_ISSUER = Party(X500Name("CN=Cash issuer,O=R3,OU=corda,L=London,C=GB"), DUMMY_CASH_ISSUER_KEY.public)
|
||||
|
||||
lateinit var oracle: NodeInterestRates.Oracle
|
||||
lateinit var dataSource: Closeable
|
||||
lateinit var database: Database
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
fun fixCmdFilter(elem: Any): Boolean {
|
||||
return when (elem) {
|
||||
@ -64,9 +61,7 @@ class NodeInterestRatesTest {
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
|
||||
dataSource = dataSourceAndDatabase.first
|
||||
database = dataSourceAndDatabase.second
|
||||
database = configureDatabase(makeTestDataSourceProperties())
|
||||
database.transaction {
|
||||
oracle = NodeInterestRates.Oracle(
|
||||
MEGA_CORP,
|
||||
@ -78,7 +73,7 @@ class NodeInterestRatesTest {
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
dataSource.close()
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -22,7 +22,6 @@ import net.corda.irs.contract.InterestRateSwap
|
||||
import net.corda.irs.flows.FixingFlow
|
||||
import net.corda.jackson.JacksonSupport
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import rx.Observable
|
||||
import java.security.PublicKey
|
||||
|
@ -19,7 +19,6 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.TestClock
|
||||
|
@ -17,12 +17,11 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.JDBCHashSet
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -108,7 +107,7 @@ class InMemoryMessagingNetwork(
|
||||
fun createNode(manuallyPumped: Boolean,
|
||||
executor: AffinityExecutor,
|
||||
advertisedServices: List<ServiceEntry>,
|
||||
database: Database): Pair<PeerHandle, MessagingServiceBuilder<InMemoryMessaging>> {
|
||||
database: CordaPersistence): Pair<PeerHandle, MessagingServiceBuilder<InMemoryMessaging>> {
|
||||
check(counter >= 0) { "In memory network stopped: please recreate." }
|
||||
val builder = createNodeWithID(manuallyPumped, counter, executor, advertisedServices, database = database) as Builder
|
||||
counter++
|
||||
@ -130,7 +129,7 @@ class InMemoryMessagingNetwork(
|
||||
executor: AffinityExecutor,
|
||||
advertisedServices: List<ServiceEntry>,
|
||||
description: X500Name = X509Utilities.getX509Name("In memory node $id","London","demo@r3.com",null),
|
||||
database: Database)
|
||||
database: CordaPersistence)
|
||||
: MessagingServiceBuilder<InMemoryMessaging> {
|
||||
val peerHandle = PeerHandle(id, description)
|
||||
peersMapping[peerHandle.description] = peerHandle // Assume that the same name - the same entity in MockNetwork.
|
||||
@ -187,7 +186,7 @@ class InMemoryMessagingNetwork(
|
||||
val id: PeerHandle,
|
||||
val serviceHandles: List<ServiceHandle>,
|
||||
val executor: AffinityExecutor,
|
||||
val database: Database) : MessagingServiceBuilder<InMemoryMessaging> {
|
||||
val database: CordaPersistence) : MessagingServiceBuilder<InMemoryMessaging> {
|
||||
override fun start(): ListenableFuture<InMemoryMessaging> {
|
||||
synchronized(this@InMemoryMessagingNetwork) {
|
||||
val node = InMemoryMessaging(manuallyPumped, id, executor, database)
|
||||
@ -304,7 +303,7 @@ class InMemoryMessagingNetwork(
|
||||
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
|
||||
private val peerHandle: PeerHandle,
|
||||
private val executor: AffinityExecutor,
|
||||
private val database: Database) : SingletonSerializeAsToken(), MessagingService {
|
||||
private val database: CordaPersistence) : SingletonSerializeAsToken(), MessagingService {
|
||||
inner class Handler(val topicSession: TopicSession,
|
||||
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
|
@ -17,11 +17,9 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.InMemoryNetworkMapCache
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import java.io.Closeable
|
||||
import java.security.KeyPair
|
||||
import java.security.cert.X509Certificate
|
||||
import kotlin.concurrent.thread
|
||||
@ -34,8 +32,7 @@ class SimpleNode(val config: NodeConfiguration, val address: NetworkHostAndPort
|
||||
rpcAddress: NetworkHostAndPort = freeLocalHostAndPort(),
|
||||
trustRoot: X509Certificate) : AutoCloseable {
|
||||
|
||||
private val databaseWithCloseable: Pair<Closeable, Database> = configureDatabase(config.dataSourceProperties)
|
||||
val database: Database get() = databaseWithCloseable.second
|
||||
val database: CordaPersistence = configureDatabase(config.dataSourceProperties)
|
||||
val userService = RPCUserServiceImpl(config.rpcUsers)
|
||||
val monitoringService = MonitoringService(MetricRegistry())
|
||||
val identity: KeyPair = generateKeyPair()
|
||||
@ -72,7 +69,7 @@ class SimpleNode(val config: NodeConfiguration, val address: NetworkHostAndPort
|
||||
override fun close() {
|
||||
network.stop()
|
||||
broker.stop()
|
||||
databaseWithCloseable.first.close()
|
||||
database.close()
|
||||
executor.shutdownNow()
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user