Expose JPA to flows (#4140)

* First pass
* Update test.
* Address review comments.
* Added docs and kdocs.
* Clean-up.
* Add extra test.
* Changes to docsite.
* Added try/catch block as recommended by Andras.
* Removed try catch block. It's not required as the checkpoint serialiser deals with this.
* Re-used existing DB session instead of creating a new session.
* Entity manager auto flushes.
* Added java friendly api.
* Addressed review comments.
This commit is contained in:
Roger Willis 2018-11-09 17:47:36 +00:00 committed by GitHub
parent 65b8cbe9b1
commit 4684259970
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 267 additions and 0 deletions

View File

@ -18,6 +18,8 @@ import net.corda.core.transactions.TransactionBuilder
import java.security.PublicKey import java.security.PublicKey
import java.sql.Connection import java.sql.Connection
import java.time.Clock import java.time.Clock
import java.util.function.Consumer
import javax.persistence.EntityManager
/** /**
* Subset of node services that are used for loading transactions from the wire into fully resolved, looked up * Subset of node services that are used for loading transactions from the wire into fully resolved, looked up
@ -358,6 +360,28 @@ interface ServiceHub : ServicesForResolution {
*/ */
fun jdbcSession(): Connection fun jdbcSession(): Connection
/**
* Exposes the Java Persistence API (JPA) to flows via a restricted [EntityManager]. This method can be used to
* persist and query entities which inherit from [MappedSchema]. This is particularly useful if off-ledger data
* needs to be kept in conjunction with on-ledger state data.
*
* NOTE: Suspendable flow operations such as send, receive, subFlow and sleep, cannot be called within the lambda.
*
* @param block a lambda function with access to an [EntityManager].
*/
fun <T : Any> withEntityManager(block: EntityManager.() -> T): T
/**
* Exposes the Java Persistence API (JPA) to flows via a restricted [EntityManager]. This method can be used to
* persist and query entities which inherit from [MappedSchema]. This is particularly useful if off-ledger data
* needs to be kept in conjunction with on-ledger state data.
*
* NOTE: Suspendable flow operations such as send, receive, subFlow and sleep, cannot be called within the lambda.
*
* @param block a lambda function with access to an [EntityManager].
*/
fun withEntityManager(block: Consumer<EntityManager>)
/** /**
* Allows the registration of a callback that may inform services when the app is shutting down. * Allows the registration of a callback that may inform services when the app is shutting down.
* *

View File

@ -155,3 +155,101 @@ which is then referenced within a custom flow:
For examples on testing ``@CordaService`` implementations, see the oracle example :doc:`here <oracles>` For examples on testing ``@CordaService`` implementations, see the oracle example :doc:`here <oracles>`
JPA Support
-----------
In addition to ``jdbcSession``, ``ServiceHub`` also exposes the Java Persistence API to flows via the ``withEntityManager``
method. This method can be used to persist and query entities which inherit from ``MappedSchema``. This is particularly
useful if off-ledger data must be maintained in conjunction with on-ledger state data.
.. note:: Your entity must be included as a mappedType in as part of a MappedSchema for it to be added to Hibernate
as a custom schema. See Samples below.
The code snippet below defines a ``PersistentFoo`` type inside ``FooSchemaV1``. Note that ``PersistentFoo`` is added to
a list of mapped types which is passed to ``MappedSChema``. This is exactly how state schemas are defined, except that
the entity in this case should not subclass ``PersistentState`` (as it is not a state object). See examples:
.. container:: codeset
.. sourcecode:: java
public class FooSchema {}
@CordaSerializable
public class FooSchemaV1 extends MappedSchema {
FooSchemaV1() {
super(FooSchema.class, 1, ImmutableList.of(PersistentFoo.class));
}
@Entity
@Table(name = "foos")
class PersistentFoo implements Serializable {
@Id
@Column(name = "foo_id")
String fooId;
@Column(name = "foo_data")
String fooData;
}
}
.. sourcecode:: kotlin
object FooSchema
object FooSchemaV1 : MappedSchema(schemaFamily = FooSchema.javaClass, version = 1, mappedTypes = listOf(PersistentFoo::class.java)) {
@Entity
@Table(name = "foos")
class PersistentFoo(@Id @Column(name = "foo_id") var fooId: String, @Column(name = "foo_data") var fooData: String) : Serializable
}
Instances of ``PersistentFoo`` can be persisted inside a flow as follows:
.. container:: codeset
.. sourcecode:: java
PersistentFoo foo = new PersistentFoo(new UniqueIdentifier().getId().toString(), "Bar");
node.getServices().withEntityManager(entityManager -> {
entityManager.persist(foo);
entityManager.flush();
return null;
});
.. sourcecode:: kotlin
val foo = FooSchemaV1.PersistentFoo(UniqueIdentifier().id.toString(), "Bar")
serviceHub.withEntityManager {
persist(foo)
}
And retrieved via a query, as follows:
.. container:: codeset
.. sourcecode:: java
node.getServices().withEntityManager((EntityManager entityManager) -> {
CriteriaQuery<PersistentFoo> query = entityManager.getCriteriaBuilder().createQuery(PersistentFoo.class);
Root<PersistentFoo> type = query.from(PersistentFoo.class);
query.select(type);
return entityManager.createQuery(query).getResultList();
});
.. sourcecode:: kotlin
val result: MutableList<FooSchemaV1.PersistentFoo> = services.withEntityManager {
val query = criteriaBuilder.createQuery(FooSchemaV1.PersistentFoo::class.java)
val type = query.from(FooSchemaV1.PersistentFoo::class.java)
query.select(type)
createQuery(query).resultList
}
Please note that suspendable flow operations such as:
* ``FlowSession.send``
* ``FlowSession.receive``
* ``FlowLogic.receiveAll``
* ``FlowLogic.sleep``
* ``FlowLogic.subFlow``
Cannot be used within the lambda function passed to ``withEntityManager``.

View File

@ -7,6 +7,7 @@ import org.hibernate.Transaction
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.sql.Connection import java.sql.Connection
import java.util.* import java.util.*
import javax.persistence.EntityManager
fun currentDBSession(): Session = contextTransaction.session fun currentDBSession(): Session = contextTransaction.session
private val _contextTransaction = ThreadLocal<DatabaseTransaction>() private val _contextTransaction = ThreadLocal<DatabaseTransaction>()
@ -59,6 +60,12 @@ class DatabaseTransaction(
session session
} }
// Returns a delegate which overrides certain operations that we do not want CorDapp developers to call.
val restrictedEntityManager: RestrictedEntityManager by lazy {
val entityManager = session as EntityManager
RestrictedEntityManager(entityManager)
}
val session: Session by sessionDelegate val session: Session by sessionDelegate
private lateinit var hibernateTransaction: Transaction private lateinit var hibernateTransaction: Transaction
@ -101,3 +108,4 @@ class DatabaseTransaction(
boundary.filter { !it.success }.subscribe { callback() } boundary.filter { !it.success }.subscribe { callback() }
} }
} }

View File

@ -0,0 +1,19 @@
package net.corda.nodeapi.internal.persistence
import javax.persistence.EntityManager
/**
* A delegate of [EntityManager] which disallows some operations.
*/
class RestrictedEntityManager(private val delegate: EntityManager) : EntityManager by delegate {
override fun close() {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
override fun clear() {
throw UnsupportedOperationException("This method cannot be called via ServiceHub.withEntityManager.")
}
// TODO: Figure out which other methods on EntityManager need to be blocked?
}

View File

@ -105,6 +105,8 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.MINUTES import java.util.concurrent.TimeUnit.MINUTES
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import java.util.function.Consumer
import javax.persistence.EntityManager
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
/** /**
@ -954,6 +956,14 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override fun jdbcSession(): Connection = database.createSession() override fun jdbcSession(): Connection = database.createSession()
override fun <T : Any> withEntityManager(block: EntityManager.() -> T): T {
return block(contextTransaction.restrictedEntityManager)
}
override fun withEntityManager(block: Consumer<EntityManager>) {
block.accept(contextTransaction.restrictedEntityManager)
}
// allows services to register handlers to be informed when the node stop method is called // allows services to register handlers to be informed when the node stop method is called
override fun registerUnloadHandler(runOnStop: () -> Unit) { override fun registerUnloadHandler(runOnStop: () -> Unit) {
this@AbstractNode.runOnStop += runOnStop this@AbstractNode.runOnStop += runOnStop

View File

@ -0,0 +1,89 @@
package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import com.esotericsoftware.kryo.KryoException
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogic.Companion.sleep
import net.corda.core.identity.CordaX500Name
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.CordaSerializable
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.TestIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestIdentityService
import org.junit.BeforeClass
import org.junit.ClassRule
import org.junit.Test
import java.io.Serializable
import java.time.Duration
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class ExposeJpaToFlowsTests {
object FooSchema
object FooSchemaV1 : MappedSchema(schemaFamily = FooSchema.javaClass, version = 1, mappedTypes = listOf(PersistentFoo::class.java)) {
@Entity
@Table(name = "foos")
class PersistentFoo(@Id @Column(name = "foo_id") var fooId: String, @Column(name = "foo_data") var fooData: String) : Serializable
}
val myself = TestIdentity(CordaX500Name("Me", "London", "GB"))
val cordapps = listOf("net.corda.node.services.persistence")
val databaseAndServices = MockServices.makeTestDatabaseAndMockServices(
cordappPackages = cordapps,
identityService = makeTestIdentityService(myself.identity),
initialIdentity = myself,
networkParameters = testNetworkParameters(minimumPlatformVersion = 4)
)
val services: MockServices = databaseAndServices.second
val database: CordaPersistence = databaseAndServices.first
@Test
fun `can persist and query custom entities`() {
val foo = FooSchemaV1.PersistentFoo(UniqueIdentifier().id.toString(), "Bar")
// Persist the foo.
val result: MutableList<FooSchemaV1.PersistentFoo> = database.transaction {
services.withEntityManager {
// Persist.
persist(foo)
// Query.
val query = criteriaBuilder.createQuery(FooSchemaV1.PersistentFoo::class.java)
val type = query.from(FooSchemaV1.PersistentFoo::class.java)
query.select(type)
createQuery(query).resultList
}
}
assertEquals("Bar", result.single().fooData)
}
@Test
fun `can't perform suspendable operations inside withEntityManager`() {
val mockNet = MockNetwork(cordapps)
val mockNode = mockNet.createNode()
assertFailsWith(KryoException::class) {
mockNode.startFlow(object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.withEntityManager {
val session = initiateFlow(myself.party)
session.send("Ooohhh eee oooh ah ah ting tang walla walla bing bang!")
}
}
})
}
mockNet.stopNodes()
}
}

View File

@ -29,6 +29,7 @@ import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.NodeVaultService
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.TestIdentity import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.DEV_ROOT_CA import net.corda.testing.internal.DEV_ROOT_CA
@ -40,6 +41,8 @@ import java.security.KeyPair
import java.sql.Connection import java.sql.Connection
import java.time.Clock import java.time.Clock
import java.util.* import java.util.*
import java.util.function.Consumer
import javax.persistence.EntityManager
/** /**
* Returns a simple [InMemoryIdentityService] containing the supplied [identities]. * Returns a simple [InMemoryIdentityService] containing the supplied [identities].
@ -121,6 +124,14 @@ open class MockServices private constructor(
} }
override fun jdbcSession(): Connection = database.createSession() override fun jdbcSession(): Connection = database.createSession()
override fun <T : Any> withEntityManager(block: EntityManager.() -> T): T {
return block(contextTransaction.restrictedEntityManager)
}
override fun withEntityManager(block: Consumer<EntityManager>) {
return block.accept(contextTransaction.restrictedEntityManager)
}
} }
} }
return Pair(database, mockService) return Pair(database, mockService)
@ -268,6 +279,14 @@ open class MockServices private constructor(
override fun jdbcSession(): Connection = throw UnsupportedOperationException() override fun jdbcSession(): Connection = throw UnsupportedOperationException()
override fun <T : Any> withEntityManager(block: EntityManager.() -> T): T {
throw UnsupportedOperationException()
}
override fun withEntityManager(block: Consumer<EntityManager>) {
throw UnsupportedOperationException()
}
override fun registerUnloadHandler(runOnStop: () -> Unit) = throw UnsupportedOperationException() override fun registerUnloadHandler(runOnStop: () -> Unit) = throw UnsupportedOperationException()
/** Add the given package name to the list of packages which will be scanned for cordapp contract verification code */ /** Add the given package name to the list of packages which will be scanned for cordapp contract verification code */