Feature/corda 1847/remove hibernate observers (#3696)

* CORDA-1847 Fix hibernate observer issue

* CORDA-1847 Fix hibernate observer issue

* CORDA-1847 Fix hibernate observer issue

* CORDA-1847 Fix tests

* CORDA-1847 Fix tests

* CORDA-1847 Fix tests
This commit is contained in:
Tudor Malene
2018-07-31 13:52:13 +01:00
committed by GitHub
parent 4542e0cd06
commit 85caa9ee9d
28 changed files with 83 additions and 91 deletions

View File

@ -33,6 +33,8 @@ class FlowCheckpointVersionNodeStartupCheckTest {
val classes = setOf(net.corda.testMessage.MessageState::class.java,
net.corda.testMessage.MessageContract::class.java,
net.test.cordapp.v1.SendMessageFlow::class.java,
net.corda.testMessage.MessageSchema::class.java,
net.corda.testMessage.MessageSchemaV1::class.java,
net.test.cordapp.v1.Record::class.java)
val user = User("mark", "dadada", setOf(startFlow<SendMessageFlow>(), invokeRpc("vaultQuery"), invokeRpc("vaultTrack")))
}

View File

@ -42,7 +42,7 @@ class DistributedServiceTests {
invokeRpc(CordaRPCOps::stateMachinesFeed))
)
driver(DriverParameters(
extraCordappPackagesToScan = listOf("net.corda.finance.contracts"),
extraCordappPackagesToScan = listOf("net.corda.finance.contracts", "net.corda.finance.schemas"),
notarySpecs = listOf(
NotarySpec(
DUMMY_NOTARY_NAME,

View File

@ -53,7 +53,6 @@ import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.*
import net.corda.node.services.persistence.*
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.*
import net.corda.node.services.transactions.*
@ -334,7 +333,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
contractUpgradeService.start()
vaultService.start()
ScheduledActivityObserver.install(vaultService, schedulerService, flowLogicRefFactory)
HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig, schemaService)
val frozenTokenizableServices = tokenizableServices!!
tokenizableServices = null
@ -864,7 +862,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
protected open fun makeVaultService(keyManagementService: KeyManagementService,
services: ServicesForResolution,
database: CordaPersistence): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, services, database)
return NodeVaultService(platformClock, keyManagementService, services, database, schemaService)
}
/** Load configured JVM agents */

View File

@ -4,16 +4,12 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.services.Vault
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.api.SchemaService
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.contextTransaction
import org.hibernate.FlushMode
import rx.Observable
import net.corda.nodeapi.internal.persistence.currentDBSession
/**
* Small data class bundling together a ContractState and a StateRef (as opposed to a TransactionState and StateRef
@ -25,17 +21,12 @@ data class ContractStateAndRef(val state: ContractState, val ref: StateRef)
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
*/
// TODO: Manage version evolution of the schemas via additional tooling.
class HibernateObserver private constructor(private val config: HibernateConfiguration, private val schemaService: SchemaService) {
class PersistentStateService(private val schemaService: SchemaService) {
companion object {
private val log = contextLogger()
fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration, schemaService: SchemaService): HibernateObserver {
val observer = HibernateObserver(config, schemaService)
vaultUpdates.subscribe { observer.persist(it.produced) }
return observer
}
}
private fun persist(produced: Set<StateAndRef<ContractState>>) {
fun persist(produced: Set<StateAndRef<ContractState>>) {
val stateBySchema: MutableMap<MappedSchema, MutableList<ContractStateAndRef>> = mutableMapOf()
// map all states by their referenced schemas
produced.forEach {
@ -51,15 +42,10 @@ class HibernateObserver private constructor(private val config: HibernateConfigu
@VisibleForTesting
internal fun persistStatesWithSchema(statesAndRefs: List<ContractStateAndRef>, schema: MappedSchema) {
val sessionFactory = config.sessionFactoryForSchemas(setOf(schema))
val session = sessionFactory.withOptions().connection(contextTransaction.connection).flushMode(FlushMode.MANUAL).openSession()
session.use { thisSession ->
statesAndRefs.forEach {
val mappedObject = schemaService.generateMappedObject(it.state, schema)
mappedObject.stateRef = PersistentStateRef(it.ref)
thisSession.persist(mappedObject)
}
thisSession.flush()
statesAndRefs.forEach {
val mappedObject = schemaService.generateMappedObject(it.state, schema)
mappedObject.stateRef = PersistentStateRef(it.ref)
currentDBSession().persist(mappedObject)
}
}
}

View File

@ -14,12 +14,11 @@ import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.*
import net.corda.core.utilities.*
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.schema.PersistentStateService
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.persistence.*
import org.hibernate.Session
import rx.Observable
import rx.subjects.PublishSubject
@ -51,7 +50,8 @@ class NodeVaultService(
private val clock: Clock,
private val keyManagementService: KeyManagementService,
private val servicesForResolution: ServicesForResolution,
private val database: CordaPersistence
private val database: CordaPersistence,
private val schemaService: SchemaService
) : SingletonSerializeAsToken(), VaultServiceInternal {
private companion object {
private val log = contextLogger()
@ -68,6 +68,7 @@ class NodeVaultService(
private val mutex = ThreadBox(InnerState())
private lateinit var criteriaBuilder: CriteriaBuilder
private val persistentStateService = PersistentStateService(schemaService)
/**
* Maintain a list of contract state interfaces to concrete types stored in the vault
@ -247,6 +248,7 @@ class NodeVaultService(
softLockReserve(uuid, stateRefs)
}
}
persistentStateService.persist(vaultUpdate.produced)
updatesPublisher.onNext(vaultUpdate)
}
}

View File

@ -81,7 +81,7 @@ class CordaRPCOpsImplTest {
@Before
fun setup() {
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts.asset"))
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts.asset", "net.corda.finance.schemas"))
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
rpc = aliceNode.rpcOps
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))

View File

@ -74,7 +74,7 @@ import kotlin.test.assertTrue
@RunWith(Parameterized::class)
class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
companion object {
private val cordappPackages = setOf("net.corda.finance.contracts")
private val cordappPackages = listOf("net.corda.finance.contracts", "net.corda.finance.schemas")
@JvmStatic
@Parameterized.Parameters(name = "Anonymous = {0}")
fun data(): Collection<Boolean> = listOf(true, false)

View File

@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch
class ServiceHubConcurrentUsageTest {
private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages(Cash::class.packageName))
private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.finance.schemas", "net.corda.node.services.vault.VaultQueryExceptionsTests", Cash::class.packageName))
@After
fun stopNodes() {

View File

@ -24,6 +24,7 @@ import net.corda.finance.SWISS_FRANCS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.DummyFungibleContract
import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.schemas.SampleCashSchemaV1
import net.corda.finance.schemas.SampleCashSchemaV2
import net.corda.finance.schemas.SampleCashSchemaV3
import net.corda.finance.utils.sumCash
@ -31,7 +32,7 @@ import net.corda.node.internal.configureDatabase
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.schema.ContractStateAndRef
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.PersistentStateService
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSchemaV1
@ -40,6 +41,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.testing.core.*
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.vault.DummyDealStateSchemaV1
import net.corda.testing.internal.vault.DummyLinearStateSchemaV1
import net.corda.testing.internal.vault.DummyLinearStateSchemaV2
import net.corda.testing.internal.vault.VaultFiller
@ -81,7 +83,7 @@ class HibernateConfigurationTest {
// Hibernate configuration objects
lateinit var hibernateConfig: HibernateConfiguration
private lateinit var hibernatePersister: HibernateObserver
private lateinit var hibernatePersister: PersistentStateService
private lateinit var sessionFactory: SessionFactory
private lateinit var entityManager: EntityManager
private lateinit var criteriaBuilder: CriteriaBuilder
@ -96,10 +98,10 @@ class HibernateConfigurationTest {
@Before
fun setUp() {
val cordappPackages = listOf("net.corda.testing.internal.vault", "net.corda.finance.contracts.asset")
val cordappPackages = listOf("net.corda.testing.internal.vault", "net.corda.finance.contracts.asset", "net.corda.finance.schemas")
bankServices = MockServices(cordappPackages, BOC.name, rigorousMock(), BOC_KEY)
issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock())
notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock())
issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock<IdentityService>())
notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock<IdentityService>())
notary = notaryServices.myInfo.singleIdentity()
val dataSourceProps = makeTestDataSourceProperties()
val identityService = rigorousMock<IdentityService>().also { mock ->
@ -109,7 +111,7 @@ class HibernateConfigurationTest {
doReturn(it.party).whenever(mock).wellKnownPartyFromX500Name(it.name)
}
}
val schemaService = NodeSchemaService()
val schemaService = NodeSchemaService(extraSchemas = setOf(CashSchemaV1, SampleCashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3, DummyLinearStateSchemaV1, DummyLinearStateSchemaV2, DummyDealStateSchemaV1))
database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService)
database.transaction {
hibernateConfig = database.hibernateConfig
@ -118,7 +120,7 @@ class HibernateConfigurationTest {
services = object : MockServices(cordappPackages, BOB_NAME, rigorousMock<IdentityServiceInternal>().also {
doNothing().whenever(it).justVerifyAndRegisterIdentity(argThat { name == BOB_NAME })
}, generateKeyPair(), dummyNotary.keyPair) {
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database).apply { start() }
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService).apply { start() }
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
(validatedTransactions as WritableTransactionStorage).addTransaction(stx)
@ -130,7 +132,7 @@ class HibernateConfigurationTest {
override fun jdbcSession() = database.createSession()
}
vaultFiller = VaultFiller(services, dummyNotary, notary, ::Random)
hibernatePersister = HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
hibernatePersister = PersistentStateService(schemaService)
}
identity = services.myInfo.singleIdentity()

View File

@ -14,25 +14,28 @@ import net.corda.core.schemas.QueryableState
import net.corda.node.services.api.SchemaService
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.testing.internal.LogHelper
import net.corda.testing.core.TestIdentity
import net.corda.testing.contracts.DummyContract
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import rx.subjects.PublishSubject
import kotlin.test.assertEquals
class HibernateObserverTests {
class PersistentStateServiceTests {
@Before
fun setUp() {
LogHelper.setLevel(HibernateObserver::class)
LogHelper.setLevel(PersistentStateService::class)
}
@After
fun cleanUp() {
LogHelper.reset(HibernateObserver::class)
LogHelper.reset(PersistentStateService::class)
}
class TestState : QueryableState {
@ -51,9 +54,8 @@ class HibernateObserverTests {
@Test
fun `test child objects are persisted`() {
val testSchema = TestSchema
val rawUpdatesPublisher = PublishSubject.create<Vault.Update<ContractState>>()
val schemaService = object : SchemaService {
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = emptyMap()
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = mapOf(testSchema to SchemaService.SchemaOptions())
override fun selectSchemas(state: ContractState): Iterable<MappedSchema> = setOf(testSchema)
@ -64,11 +66,12 @@ class HibernateObserverTests {
return parent
}
}
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, schemaService)
HibernateObserver.install(rawUpdatesPublisher, database.hibernateConfig, schemaService)
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock(), rigorousMock(), schemaService)
val persistentStateService = PersistentStateService(schemaService)
database.transaction {
val MEGA_CORP = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
persistentStateService.persist(setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0))))
currentDBSession().flush()
val parentRowCountResult = connection.prepareStatement("select count(*) from Parents").executeQuery()
parentRowCountResult.next()
val parentRows = parentRowCountResult.getInt(1)

View File

@ -11,10 +11,7 @@ import net.corda.core.identity.*
import net.corda.core.internal.NotaryChangeTransactionBuilder
import net.corda.core.internal.packageName
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.*
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.QueryCriteria.*
@ -97,8 +94,8 @@ class NodeVaultServiceTest {
vaultFiller = VaultFiller(services, dummyNotary)
// This is safe because MockServices only ever have a single identity
identity = services.myInfo.singleIdentityAndCert()
issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock())
bocServices = MockServices(cordappPackages, bankOfCorda, rigorousMock())
issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock<IdentityService>())
bocServices = MockServices(cordappPackages, bankOfCorda, rigorousMock<IdentityService>())
services.identityService.verifyAndRegisterIdentity(DUMMY_CASH_ISSUER_IDENTITY)
services.identityService.verifyAndRegisterIdentity(BOC_IDENTITY)
}

View File

@ -23,8 +23,7 @@ class VaultQueryExceptionsTests : VaultQueryParties by rule {
override val cordappPackages = listOf(
"net.corda.testing.contracts",
"net.corda.finance.contracts",
CashSchemaV1::class.packageName,
DummyLinearStateSchemaV1::class.packageName) - SampleCashSchemaV3::class.packageName
DummyLinearStateSchemaV1::class.packageName)
}
}
@ -43,9 +42,6 @@ class VaultQueryExceptionsTests : VaultQueryParties by rule {
@Test
fun `query attempting to use unregistered schema`() {
database.transaction {
vaultFiller.fillWithSomeTestCash(100.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER)
vaultFiller.fillWithSomeTestCash(100.POUNDS, notaryServices, 1, DUMMY_CASH_ISSUER)
vaultFiller.fillWithSomeTestCash(100.SWISS_FRANCS, notaryServices, 1, DUMMY_CASH_ISSUER)
// CashSchemaV3 NOT registered with NodeSchemaService
val logicalExpression = builder { SampleCashSchemaV3.PersistentCashState::currency.equal(GBP.currencyCode) }
val criteria = VaultCustomQueryCriteria(logicalExpression)

View File

@ -24,6 +24,7 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.VaultServiceInternal
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.core.singleIdentity

View File

@ -45,7 +45,7 @@ import kotlin.test.fail
class VaultWithCashTest {
private companion object {
val cordappPackages = listOf("net.corda.testing.internal.vault", "net.corda.finance.contracts.asset", CashSchemaV1::class.packageName)
val cordappPackages = listOf("net.corda.testing.internal.vault", "net.corda.finance.contracts.asset", CashSchemaV1::class.packageName, "net.corda.core.contracts")
val BOB = TestIdentity(BOB_NAME, 80).party
val dummyCashIssuer = TestIdentity(CordaX500Name("Snake Oil Issuer", "London", "GB"), 10)
val DUMMY_CASH_ISSUER = dummyCashIssuer.ref(1)
@ -83,7 +83,7 @@ class VaultWithCashTest {
services = databaseAndServices.second
vaultFiller = VaultFiller(services, dummyNotary)
issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock(), MEGA_CORP_KEY)
notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock())
notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock<IdentityService>())
notary = notaryServices.myInfo.legalIdentitiesAndCerts.single().party
}