Hibernate ORM implementation for states.

This commit is contained in:
rick.parker
2016-09-27 15:17:27 +01:00
parent 07e528c659
commit ec975b0426
19 changed files with 540 additions and 31 deletions

View File

@ -40,6 +40,8 @@ import com.r3corda.node.services.network.NetworkMapService.RegistrationResponse
import com.r3corda.node.services.network.NodeRegistration
import com.r3corda.node.services.network.PersistentNetworkMapService
import com.r3corda.node.services.persistence.*
import com.r3corda.node.services.schema.HibernateObserver
import com.r3corda.node.services.schema.NodeSchemaService
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.SimpleNotaryService
@ -105,6 +107,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
override val identityService: IdentityService get() = identity
override val schedulerService: SchedulerService get() = scheduler
override val clock: Clock = platformClock
override val schemaService: SchemaService get() = schemas
// Internal only
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
@ -147,6 +150,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
lateinit var api: APIServer
lateinit var scheduler: SchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
lateinit var schemas: SchemaService
val customServices: ArrayList<Any> = ArrayList()
protected val runOnStop: ArrayList<Runnable> = ArrayList()
lateinit var database: Database
@ -190,6 +194,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
checkpointStorage = storageServices.second
netMapCache = InMemoryNetworkMapCache()
net = makeMessagingService()
schemas = makeSchemaService()
vault = makeVaultService()
identity = makeIdentityService()
@ -237,6 +242,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
// Add vault observers
CashBalanceAsMetricsObserver(services)
ScheduledActivityObserver(services)
HibernateObserver(services)
checkpointStorage.forEach {
isPreviousCheckpointsPresent = true
@ -407,6 +413,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
// TODO: sort out ordering of open & protected modifiers of functions in this class.
protected open fun makeVaultService(): VaultService = NodeVaultService(services)
protected open fun makeSchemaService(): SchemaService = NodeSchemaService()
open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()

View File

@ -0,0 +1,34 @@
package com.r3corda.node.services.api
import com.r3corda.core.schemas.MappedSchema
import com.r3corda.core.schemas.PersistentState
import com.r3corda.core.schemas.QueryableState
//DOCSTART SchemaService
/**
* A configuration and customisation point for Object Relational Mapping of contract state objects.
*/
interface SchemaService {
/**
* Represents any options configured on the node for a schema.
*/
data class SchemaOptions(val databaseSchema: String?, val tablePrefix: String?)
/**
* Options configured for this node's schemas. A missing entry for a schema implies all properties are null.
*/
val schemaOptions: Map<MappedSchema, SchemaOptions>
/**
* Given a state, select schemas to map it to that are supported by [generateMappedObject] and that are configured
* for this node.
*/
fun selectSchemas(state: QueryableState): Iterable<MappedSchema>
/**
* Map a state to a [PersistentState] for the given schema, either via direct support from the state
* or via custom logic in this service.
*/
fun generateMappedObject(state: QueryableState, schema: MappedSchema): PersistentState
}
//DOCEND SchemaService

View File

@ -40,6 +40,7 @@ private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java)
abstract class ServiceHubInternal : ServiceHub {
abstract val monitoringService: MonitoringService
abstract val protocolLogicRefFactory: ProtocolLogicRefFactory
abstract val schemaService: SchemaService
abstract override val networkService: MessagingServiceInternal

View File

@ -0,0 +1,124 @@
package com.r3corda.node.services.schema
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.contracts.StateRef
import com.r3corda.core.schemas.MappedSchema
import com.r3corda.core.schemas.PersistentStateRef
import com.r3corda.core.schemas.QueryableState
import com.r3corda.core.utilities.debug
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.ServiceHubInternal
import kotlinx.support.jdk7.use
import org.hibernate.SessionFactory
import org.hibernate.boot.model.naming.Identifier
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
import org.hibernate.cfg.Configuration
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment
import org.hibernate.service.UnknownUnwrapTypeException
import org.jetbrains.exposed.sql.transactions.TransactionManager
import java.sql.Connection
import java.util.concurrent.ConcurrentHashMap
/**
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
*/
// TODO: Manage version evolution of the schemas via additional tooling.
class HibernateObserver(services: ServiceHubInternal) {
companion object {
val logger = loggerFor<HibernateObserver>()
}
val schemaService = services.schemaService
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
val sessionFactories = ConcurrentHashMap<MappedSchema, SessionFactory>()
init {
services.vaultService.updates.subscribe { persist(it.produced) }
}
private fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory {
return sessionFactories.computeIfAbsent(schema, { makeSessionFactoryForSchema(it) })
}
private fun makeSessionFactoryForSchema(schema: MappedSchema): SessionFactory {
logger.info("Creating session factory for schema $schema")
// We set a connection provider as the auto schema generation requires it. The auto schema generation will not
// necessarily remain and would likely be replaced by something like Liquibase. For now it is very convenient though.
// TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs.
val config = Configuration().setProperty("hibernate.connection.provider_class", NodeDatabaseConnectionProvider::class.java.name)
.setProperty("hibernate.hbm2ddl.auto", "update")
.setProperty("hibernate.show_sql", "true")
.setProperty("hibernate_format_sql", "true")
val options = schemaService.schemaOptions[schema]
val databaseSchema = options?.databaseSchema
if (databaseSchema != null) {
logger.debug { "Database schema = $databaseSchema" }
config.setProperty("hibernate.default_schema", databaseSchema)
}
val tablePrefix = options?.tablePrefix
if (tablePrefix != null) {
logger.debug { "Table prefix = $tablePrefix" }
config.setPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() {
override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier {
val default = super.toPhysicalTableName(name, context)
return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted)
}
})
}
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
val sessionFactory = config.buildSessionFactory()
logger.info("Created session factory for schema $schema")
return sessionFactory
}
private fun persist(produced: Set<StateAndRef<ContractState>>) {
produced.forEach { persistState(it) }
}
private fun persistState(stateAndRef: StateAndRef<ContractState>) {
val state = stateAndRef.state.data
if (state is QueryableState) {
logger.debug { "Asked to persist state ${stateAndRef.ref}" }
schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
}
}
private fun persistStateWithSchema(state: QueryableState, stateRef: StateRef, schema: MappedSchema) {
val sessionFactory = sessionFactoryForSchema(schema)
val session = sessionFactory.openStatelessSession(TransactionManager.current().connection)
session.use {
val mappedObject = schemaService.generateMappedObject(state, schema)
mappedObject.stateRef = PersistentStateRef(stateRef)
session.insert(mappedObject)
}
}
// Supply Hibernate with connections from our underlying Exposed database integration. Only used
// during schema creation / update.
class NodeDatabaseConnectionProvider : ConnectionProvider {
override fun closeConnection(conn: Connection) {
val tx = TransactionManager.current()
tx.commit()
tx.close()
}
override fun supportsAggressiveRelease(): Boolean = true
override fun getConnection(): Connection {
val tx = TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ)
return tx.connection
}
override fun <T : Any?> unwrap(unwrapType: Class<T>?): T {
if (unwrapType == NodeDatabaseConnectionProvider::class.java) {
return unwrapType.cast(this)
} else {
throw UnknownUnwrapTypeException(unwrapType)
}
}
override fun isUnwrappableAs(unwrapType: Class<*>?): Boolean = (unwrapType == NodeDatabaseConnectionProvider::class.java)
}
}

View File

@ -0,0 +1,29 @@
package com.r3corda.node.services.schema
import com.r3corda.core.schemas.MappedSchema
import com.r3corda.core.schemas.PersistentState
import com.r3corda.core.schemas.QueryableState
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.node.services.api.SchemaService
/**
* Most basic implementation of [SchemaService].
*
* TODO: support loading schema options from node configuration.
* TODO: support configuring what schemas are to be selected for persistence.
* TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState].
*/
class NodeSchemaService : SchemaService, SingletonSerializeAsToken() {
// Currently does not support configuring schema options.
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = emptyMap()
// Currently returns all schemas supported by the state, with no filtering or enrichment.
override fun selectSchemas(state: QueryableState): Iterable<MappedSchema> {
return state.supportedSchemas()
}
// Because schema is always one supported by the state, just delegate.
override fun generateMappedObject(state: QueryableState, schema: MappedSchema): PersistentState {
return state.generateMappedObject(schema)
}
}

View File

@ -9,6 +9,8 @@ import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.Vault
import com.r3corda.core.node.services.VaultService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.parseAsHex
import com.r3corda.core.serialization.toHexString
import com.r3corda.core.transactions.WireTransaction
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
@ -16,7 +18,6 @@ import com.r3corda.node.utilities.AbstractJDBCHashSet
import com.r3corda.node.utilities.JDBCHashedTable
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import org.slf4j.LoggerFactory
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
@ -39,16 +40,16 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
}
private object StatesSetTable : JDBCHashedTable("vault_unconsumed_states") {
val txhash = binary("transaction_id", 32)
val txhash = varchar("transaction_id", 64)
val index = integer("output_index")
}
private val mutex = ThreadBox(object {
val unconsumedStates = object : AbstractJDBCHashSet<StateRef, StatesSetTable>(StatesSetTable) {
override fun elementFromRow(it: ResultRow): StateRef = StateRef(SecureHash.SHA256(it[table.txhash]), it[table.index])
override fun elementFromRow(it: ResultRow): StateRef = StateRef(SecureHash.SHA256(it[table.txhash].parseAsHex()), it[table.index])
override fun addElementToInsert(it: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) {
it[table.txhash] = entry.txhash.bits
it[table.txhash] = entry.txhash.bits.toHexString()
it[table.index] = entry.index
}
}

View File

@ -6,7 +6,7 @@ keyStorePassword = "cordacadevpass"
trustStorePassword = "trustpass"
dataSourceProperties = {
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
"dataSource.url" = "jdbc:h2:"${basedir}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;MVCC=true;MV_STORE=true"
"dataSource.url" = "jdbc:h2:"${basedir}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;MVCC=true;MV_STORE=true;WRITE_DELAY=0"
"dataSource.user" = sa
"dataSource.password" = ""
}

View File

@ -18,6 +18,7 @@ import com.r3corda.core.utilities.DUMMY_NOTARY
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
import com.r3corda.core.utilities.LogHelper
import com.r3corda.core.utilities.TEST_TX_TIME
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.persistence.PerFileTransactionStorage
@ -88,12 +89,12 @@ class TwoPartyTradeProtocolTests {
aliceNode.disableDBCloseOnStop()
bobNode.disableDBCloseOnStop()
bobNode.services.fillWithSomeTestCash(2000.DOLLARS)
databaseTransaction(bobNode.database) {
bobNode.services.fillWithSomeTestCash(2000.DOLLARS)
}
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null).second
insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey, notaryNode.storage.myLegalIdentityKey)
insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey, notaryNode.storage.myLegalIdentityKey)
val (bobPsm, aliceResult) = runBuyerAndSeller("alice's paper".outputStateAndRef())
// TODO: Verify that the result was inserted into the transaction database.
@ -126,11 +127,12 @@ class TwoPartyTradeProtocolTests {
net.runNetwork() // Clear network map registration messages
bobNode.services.fillWithSomeTestCash(2000.DOLLARS)
databaseTransaction(bobNode.database) {
bobNode.services.fillWithSomeTestCash(2000.DOLLARS)
}
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null).second
insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)
insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey)
val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerResult
// Everything is on this thread so we can now step through the protocol one step at a time.
@ -227,10 +229,10 @@ class TwoPartyTradeProtocolTests {
val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray()))
val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public).second
val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode.services)
val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode)
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID).second
val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)
val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey)
net.runNetwork() // Clear network map registration messages
@ -318,10 +320,10 @@ class TwoPartyTradeProtocolTests {
val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray()))
val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public).second
insertFakeTransactions(bobsFakeCash, bobNode.services)
insertFakeTransactions(bobsFakeCash, bobNode)
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID).second
insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)
insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey)
net.runNetwork() // Clear network map registration messages
@ -407,8 +409,8 @@ class TwoPartyTradeProtocolTests {
val alicesFakePaper = fillUpForSeller(aliceError, aliceNode.storage.myLegalIdentity.owningKey,
1200.DOLLARS `issued by` issuer, null).second
insertFakeTransactions(bobsBadCash, bobNode.services, bobNode.storage.myLegalIdentityKey, bobNode.storage.myLegalIdentityKey)
insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)
insertFakeTransactions(bobsBadCash, bobNode, bobNode.storage.myLegalIdentityKey, bobNode.storage.myLegalIdentityKey)
insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey)
net.runNetwork() // Clear network map registration messages
@ -435,15 +437,17 @@ class TwoPartyTradeProtocolTests {
private fun insertFakeTransactions(
wtxToSign: List<WireTransaction>,
services: ServiceHub,
node: AbstractNode,
vararg extraKeys: KeyPair): Map<SecureHash, SignedTransaction> {
val signed: List<SignedTransaction> = signAll(wtxToSign, extraKeys.toList() + DUMMY_CASH_ISSUER_KEY)
services.recordTransactions(signed)
val validatedTransactions = services.storageService.validatedTransactions
if (validatedTransactions is RecordingTransactionStorage) {
validatedTransactions.records.clear()
return databaseTransaction(node.database) {
val signed: List<SignedTransaction> = signAll(wtxToSign, extraKeys.toList() + DUMMY_CASH_ISSUER_KEY)
node.services.recordTransactions(signed)
val validatedTransactions = node.services.storageService.validatedTransactions
if (validatedTransactions is RecordingTransactionStorage) {
validatedTransactions.records.clear()
}
return@databaseTransaction signed.associateBy { it.id }
}
return signed.associateBy { it.id }
}
private fun LedgerDSL<TestTransactionDSLInterpreter, TestLedgerDSLInterpreter>.fillUpForBuyer(

View File

@ -12,7 +12,9 @@ import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.api.SchemaService
import com.r3corda.node.services.persistence.DataVending
import com.r3corda.node.services.schema.NodeSchemaService
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.testing.node.MockNetworkMapCache
@ -31,7 +33,8 @@ open class MockServiceHubInternal(
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory()
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory(),
val schemas: SchemaService? = NodeSchemaService()
) : ServiceHubInternal() {
override val vaultService: VaultService = customVault ?: InMemoryVaultService(this)
override val keyManagementService: KeyManagementService
@ -52,6 +55,8 @@ open class MockServiceHubInternal(
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory: ProtocolLogicRefFactory
get() = protocolFactory ?: throw UnsupportedOperationException()
override val schemaService: SchemaService
get() = schemas ?: throw UnsupportedOperationException()
// We isolate the storage service with writable TXes so that it can't be accessed except via recordTransactions()
private val txStorageService: TxWritableStorageService