From ec975b0426c981ed171559489c8ef09aee36a27d Mon Sep 17 00:00:00 2001 From: "rick.parker" Date: Tue, 27 Sep 2016 15:17:27 +0100 Subject: [PATCH] Hibernate ORM implementation for states. --- .../com/r3corda/contracts/CommercialPaper.kt | 32 ++++- .../com/r3corda/contracts/asset/Cash.kt | 29 +++- .../com/r3corda/schemas/CashSchemaV1.kt | 40 ++++++ .../schemas/CommercialPaperSchemaV1.kt | 47 +++++++ core/build.gradle | 3 + .../r3corda/core/schemas/PersistentTypes.kt | 66 ++++++++++ .../r3corda/core/serialization/ByteArrays.kt | 4 +- docs/source/index.rst | 1 + docs/source/persistence.rst | 87 ++++++++++++ node/build.gradle | 4 + .../com/r3corda/node/internal/AbstractNode.kt | 8 ++ .../node/services/api/SchemaService.kt | 34 +++++ .../node/services/api/ServiceHubInternal.kt | 1 + .../node/services/schema/HibernateObserver.kt | 124 ++++++++++++++++++ .../node/services/schema/NodeSchemaService.kt | 29 ++++ .../node/services/vault/NodeVaultService.kt | 9 +- node/src/main/resources/reference.conf | 2 +- .../messaging/TwoPartyTradeProtocolTests.kt | 44 ++++--- .../node/services/MockServiceHubInternal.kt | 7 +- 19 files changed, 540 insertions(+), 31 deletions(-) create mode 100644 contracts/src/main/kotlin/com/r3corda/schemas/CashSchemaV1.kt create mode 100644 contracts/src/main/kotlin/com/r3corda/schemas/CommercialPaperSchemaV1.kt create mode 100644 core/src/main/kotlin/com/r3corda/core/schemas/PersistentTypes.kt create mode 100644 docs/source/persistence.rst create mode 100644 node/src/main/kotlin/com/r3corda/node/services/api/SchemaService.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/services/schema/NodeSchemaService.kt diff --git a/contracts/src/main/kotlin/com/r3corda/contracts/CommercialPaper.kt b/contracts/src/main/kotlin/com/r3corda/contracts/CommercialPaper.kt index 5fe1763cd3..48f0105259 100644 --- a/contracts/src/main/kotlin/com/r3corda/contracts/CommercialPaper.kt +++ b/contracts/src/main/kotlin/com/r3corda/contracts/CommercialPaper.kt @@ -6,13 +6,21 @@ import com.r3corda.contracts.asset.InsufficientBalanceException import com.r3corda.contracts.asset.sumCashBy import com.r3corda.contracts.clause.AbstractIssue import com.r3corda.core.contracts.* -import com.r3corda.core.contracts.clauses.* +import com.r3corda.core.contracts.clauses.AnyComposition +import com.r3corda.core.contracts.clauses.Clause +import com.r3corda.core.contracts.clauses.GroupClauseVerifier +import com.r3corda.core.contracts.clauses.verifyClause import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.crypto.toBase58String import com.r3corda.core.crypto.toStringShort import com.r3corda.core.random63BitValue +import com.r3corda.core.schemas.MappedSchema +import com.r3corda.core.schemas.PersistentState +import com.r3corda.core.schemas.QueryableState import com.r3corda.core.transactions.TransactionBuilder import com.r3corda.core.utilities.Emoji +import com.r3corda.schemas.CommercialPaperSchemaV1 import java.security.PublicKey import java.time.Instant import java.util.* @@ -58,7 +66,7 @@ class CommercialPaper : Contract { override val owner: PublicKey, val faceValue: Amount>, val maturityDate: Instant - ) : OwnableState, ICommercialPaperState { + ) : OwnableState, QueryableState, ICommercialPaperState { override val contract = CP_PROGRAM_ID override val participants: List get() = listOf(owner) @@ -75,6 +83,26 @@ class CommercialPaper : Contract { override fun withIssuance(newIssuance: PartyAndReference): ICommercialPaperState = copy(issuance = newIssuance) override fun withFaceValue(newFaceValue: Amount>): ICommercialPaperState = copy(faceValue = newFaceValue) override fun withMaturityDate(newMaturityDate: Instant): ICommercialPaperState = copy(maturityDate = newMaturityDate) + + /** Object Relational Mapping support. */ + override fun supportedSchemas(): Iterable = listOf(CommercialPaperSchemaV1) + + /** Object Relational Mapping support. */ + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return when (schema) { + is CommercialPaperSchemaV1 -> CommercialPaperSchemaV1.PersistentCommericalPaperState( + issuanceParty = this.issuance.party.owningKey.toBase58String(), + issuanceRef = this.issuance.reference.bits, + owner = this.owner.toBase58String(), + maturity = this.maturityDate, + faceValue = this.faceValue.quantity, + currency = this.faceValue.token.product.currencyCode, + faceValueIssuerParty = this.faceValue.token.issuer.party.owningKey.toBase58String(), + faceValueIssuerRef = this.faceValue.token.issuer.reference.bits + ) + else -> throw IllegalArgumentException("Unrecognised schema $schema") + } + } } interface Clauses { diff --git a/contracts/src/main/kotlin/com/r3corda/contracts/asset/Cash.kt b/contracts/src/main/kotlin/com/r3corda/contracts/asset/Cash.kt index 452549928e..d4fb1db7e9 100644 --- a/contracts/src/main/kotlin/com/r3corda/contracts/asset/Cash.kt +++ b/contracts/src/main/kotlin/com/r3corda/contracts/asset/Cash.kt @@ -4,11 +4,18 @@ import com.r3corda.contracts.clause.AbstractConserveAmount import com.r3corda.contracts.clause.AbstractIssue import com.r3corda.contracts.clause.NoZeroSizedOutputs import com.r3corda.core.contracts.* -import com.r3corda.core.contracts.clauses.* +import com.r3corda.core.contracts.clauses.AllComposition +import com.r3corda.core.contracts.clauses.FirstComposition +import com.r3corda.core.contracts.clauses.GroupClauseVerifier +import com.r3corda.core.contracts.clauses.verifyClause import com.r3corda.core.crypto.* import com.r3corda.core.node.services.Vault +import com.r3corda.core.schemas.MappedSchema +import com.r3corda.core.schemas.PersistentState +import com.r3corda.core.schemas.QueryableState import com.r3corda.core.transactions.TransactionBuilder import com.r3corda.core.utilities.Emoji +import com.r3corda.schemas.CashSchemaV1 import java.math.BigInteger import java.security.PublicKey import java.util.* @@ -79,7 +86,7 @@ class Cash : OnLedgerAsset() { /** There must be a MoveCommand signed by this key to claim the amount. */ override val owner: PublicKey, override val encumbrance: Int? = null - ) : FungibleAsset { + ) : FungibleAsset, QueryableState { constructor(deposit: PartyAndReference, amount: Amount, owner: PublicKey) : this(Amount(amount.quantity, Issued(deposit, amount.token)), owner) @@ -95,6 +102,24 @@ class Cash : OnLedgerAsset() { override fun toString() = "${Emoji.bagOfCash}Cash($amount at $deposit owned by ${owner.toStringShort()})" override fun withNewOwner(newOwner: PublicKey) = Pair(Commands.Move(), copy(owner = newOwner)) + + /** Object Relational Mapping support. */ + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return when (schema) { + is CashSchemaV1 -> CashSchemaV1.PersistentCashState( + encumbrance = this.encumbrance, + owner = this.owner.toBase58String(), + pennies = this.amount.quantity, + currency = this.amount.token.product.currencyCode, + issuerParty = this.amount.token.issuer.party.owningKey.toBase58String(), + issuerRef = this.amount.token.issuer.reference.bits + ) + else -> throw IllegalArgumentException("Unrecognised schema $schema") + } + } + + /** Object Relational Mapping support. */ + override fun supportedSchemas(): Iterable = listOf(CashSchemaV1) } // Just for grouping diff --git a/contracts/src/main/kotlin/com/r3corda/schemas/CashSchemaV1.kt b/contracts/src/main/kotlin/com/r3corda/schemas/CashSchemaV1.kt new file mode 100644 index 0000000000..75f3982dec --- /dev/null +++ b/contracts/src/main/kotlin/com/r3corda/schemas/CashSchemaV1.kt @@ -0,0 +1,40 @@ +package com.r3corda.schemas + +import com.r3corda.core.schemas.MappedSchema +import com.r3corda.core.schemas.PersistentState +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Table + +/** + * An object used to fully qualify the [CashSchema] family name (i.e. independent of version). + */ +object CashSchema + +/** + * First version of a cash contract ORM schema that maps all fields of the [Cash] contract state as it stood + * at the time of writing. + */ +object CashSchemaV1 : MappedSchema(schemaFamily = CashSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCashState::class.java)) { + @Entity + @Table(name = "cash_states") + class PersistentCashState( + @Column(name = "encumbrance") + var encumbrance: Int?, + + @Column(name = "owner_key") + var owner: String, + + @Column(name = "pennies") + var pennies: Long, + + @Column(name = "ccy_code", length = 3) + var currency: String, + + @Column(name = "issuer_key") + var issuerParty: String, + + @Column(name = "issuer_ref") + var issuerRef: ByteArray + ) : PersistentState() +} diff --git a/contracts/src/main/kotlin/com/r3corda/schemas/CommercialPaperSchemaV1.kt b/contracts/src/main/kotlin/com/r3corda/schemas/CommercialPaperSchemaV1.kt new file mode 100644 index 0000000000..19a46786cf --- /dev/null +++ b/contracts/src/main/kotlin/com/r3corda/schemas/CommercialPaperSchemaV1.kt @@ -0,0 +1,47 @@ +package com.r3corda.schemas + +import com.r3corda.core.schemas.MappedSchema +import com.r3corda.core.schemas.PersistentState +import java.time.Instant +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Table + +/** + * An object used to fully qualify the [CommercialPaperSchema] family name (i.e. independent of version). + */ +object CommercialPaperSchema + +/** + * First version of a commercial paper contract ORM schema that maps all fields of the [CommercialPaper] contract state + * as it stood at the time of writing. + */ +object CommercialPaperSchemaV1 : MappedSchema(schemaFamily = CommercialPaperSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCommericalPaperState::class.java)) { + @Entity + @Table(name = "cp_states") + class PersistentCommericalPaperState( + @Column(name = "issuance_key") + var issuanceParty: String, + + @Column(name = "issuance_ref") + var issuanceRef: ByteArray, + + @Column(name = "owner_key") + var owner: String, + + @Column(name = "maturity_instant") + var maturity: Instant, + + @Column(name = "face_value") + var faceValue: Long, + + @Column(name = "ccy_code", length = 3) + var currency: String, + + @Column(name = "face_value_issuer_key") + var faceValueIssuerParty: String, + + @Column(name = "face_value_issuer_ref") + var faceValueIssuerRef: ByteArray + ) : PersistentState() +} diff --git a/core/build.gradle b/core/build.gradle index 2f060971a7..6eef9feb06 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -82,6 +82,9 @@ dependencies { // Bouncy castle support needed for X509 certificate manipulation compile "org.bouncycastle:bcprov-jdk15on:${bouncycastle_version}" compile "org.bouncycastle:bcpkix-jdk15on:${bouncycastle_version}" + + // JPA 2.1 annotations. + compile "org.hibernate.javax.persistence:hibernate-jpa-2.1-api:1.0.0.Final" } quasarScan.dependsOn('classes') diff --git a/core/src/main/kotlin/com/r3corda/core/schemas/PersistentTypes.kt b/core/src/main/kotlin/com/r3corda/core/schemas/PersistentTypes.kt new file mode 100644 index 0000000000..eeda09128a --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/core/schemas/PersistentTypes.kt @@ -0,0 +1,66 @@ +package com.r3corda.core.schemas + +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.contracts.StateRef +import com.r3corda.core.serialization.toHexString +import java.io.Serializable +import javax.persistence.Column +import javax.persistence.Embeddable +import javax.persistence.EmbeddedId +import javax.persistence.MappedSuperclass + +//DOCSTART QueryableState +/** + * A contract state that may be mapped to database schemas configured for this node to support querying for, + * or filtering of, states. + */ +interface QueryableState : ContractState { + /** + * Enumerate the schemas this state can export representations of itself as. + */ + fun supportedSchemas(): Iterable + + /** + * Export a representation for the given schema. + */ + fun generateMappedObject(schema: MappedSchema): PersistentState +} +//DOCEND QueryableState + +//DOCSTART MappedSchema +/** + * A database schema that might be configured for this node. As well as a name and version for identifying the schema, + * also list the classes that may be used in the generated object graph in order to configure the ORM tool. + * + * @param schemaFamily A class to fully qualify the name of a schema family (i.e. excludes version) + * @param version The version number of this instance within the family. + * @param mappedTypes The JPA entity classes that the ORM layer needs to be configure with for this schema. + */ +abstract class MappedSchema(schemaFamily: Class<*>, + val version: Int, + val mappedTypes: Iterable>) { + val name: String = schemaFamily.name + override fun toString(): String = "${this.javaClass.simpleName}(name=$name, version=$version)" +} +//DOCEND MappedSchema + +/** + * A super class for all mapped states exported to a schema that ensures the [StateRef] appears on the database row. The + * [StateRef] will be set to the correct value by the framework (there's no need to set during mapping generation by the state itself). + */ +@MappedSuperclass open class PersistentState(@EmbeddedId var stateRef: PersistentStateRef? = null) + +/** + * Embedded [StateRef] representation used in state mapping. + */ +@Embeddable +data class PersistentStateRef( + @Column(name = "transaction_id", length = 64) + var txId: String?, + + @Column(name = "output_index") + var index: Int? +) : Serializable { + constructor(stateRef: StateRef) : this(stateRef.txhash.bits.toHexString(), stateRef.index) + constructor() : this(null, null) +} diff --git a/core/src/main/kotlin/com/r3corda/core/serialization/ByteArrays.kt b/core/src/main/kotlin/com/r3corda/core/serialization/ByteArrays.kt index a02d533237..593baac325 100644 --- a/core/src/main/kotlin/com/r3corda/core/serialization/ByteArrays.kt +++ b/core/src/main/kotlin/com/r3corda/core/serialization/ByteArrays.kt @@ -25,7 +25,7 @@ open class OpaqueBytes(val bits: ByteArray) { } override fun hashCode() = Arrays.hashCode(bits) - override fun toString() = "[" + BaseEncoding.base16().encode(bits) + "]" + override fun toString() = "[" + bits.toHexString() + "]" val size: Int get() = bits.size @@ -34,3 +34,5 @@ open class OpaqueBytes(val bits: ByteArray) { } fun ByteArray.opaque(): OpaqueBytes = OpaqueBytes(this) +fun ByteArray.toHexString() = BaseEncoding.base16().encode(this) +fun String.parseAsHex() = BaseEncoding.base16().decode(this) \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index 83c492a821..2b9aa8831c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -34,6 +34,7 @@ Read on to learn: transaction-data-types consensus messaging + persistence creating-a-cordapp running-the-demos node-administration diff --git a/docs/source/persistence.rst b/docs/source/persistence.rst new file mode 100644 index 0000000000..e40e02f3fe --- /dev/null +++ b/docs/source/persistence.rst @@ -0,0 +1,87 @@ +Persistence +=========== + +Corda offers developers the option to expose all or some part of a contract state to an *Object Relational Mapping* (ORM) tool +to be persisted in a RDBMS. The purpose of this is to assist *vault* development by effectively indexing +persisted contract states held in the vault for the purpose of running queries over them and to allow relational joins +between Corda data and private data local to the organisation owning a node. + +The ORM mapping is specified using the `Java Persistence API `_ (JPA) +as annotations and is converted to database table rows by the node automatically every time a state is recorded in the +node's local vault as part of a transaction. + +.. note:: Presently the node includes an instance of the H2 database but any database that supports JDBC is a candidate and + the node will in the future support a range of database implementations via their JDBC drivers. + +Schemas +------- + +Every ``ContractState`` can implement the ``QueryableState`` interface if it wishes to be inserted into the node's local +database and accessible using SQL. + +.. literalinclude:: ../../core/src/main/kotlin/com/r3corda/core/schemas/PersistentTypes.kt + :language: kotlin + :start-after: DOCSTART QueryableState + :end-before: DOCEND QueryableState + +The ``QueryableState`` interface requires the state to enumerate the different relational schemas it supports, for instance in +cases where the schema has evolved, with each one being represented by a ``MappedSchema`` object return by the +``supportedSchemas()`` method. Once a schema is selected it must generate that representation when requested via the +``generateMappedObject()`` method which is then passed to the ORM. + +Nodes have an internal ``SchemaService`` which decides what to persist and what not by selecting the ``MappedSchema`` +to use. + +.. literalinclude:: ../../node/src/main/kotlin/com/r3corda/node/services/api/SchemaService.kt + :language: kotlin + :start-after: DOCSTART SchemaService + :end-before: DOCEND SchemaService + +.. literalinclude:: ../../core/src/main/kotlin/com/r3corda/core/schemas/PersistentTypes.kt + :language: kotlin + :start-after: DOCSTART MappedSchema + :end-before: DOCEND MappedSchema + +The ``SchemaService`` can be configured by a node administrator to select the schemas used by each app. In this way the +relational view of ledger states can evolve in a controlled fashion in lock-step with internal systems or other +integration points and not necessarily with every upgrade to the contract code. +It can select from the ``MappedSchema`` offered by a ``QueryableState``, automatically upgrade to a +later version of a schema or even provide a ``MappedSchema`` not originally offered by the ``QueryableState``. + +It is expected that multiple different contract state implementations might provide mappings to some common schema. +For example an Interest Rate Swap contract and an Equity OTC Option contract might both provide a mapping to a common +Derivative schema. The schemas should typically not be part of the contract itself and should exist independently of it +to encourage re-use of a common set within a particular buisness area or Cordapp. + +``MappedSchema`` offer a family name that is disambiguated using Java package style name-spacing derived from the class name +of a *schema family* class that is constant across versions, allowing the ``SchemaService`` to select a preferred version +of a schema. + +The ``SchemaService`` is also responsible for the ``SchemaOptions`` that can be configured for a particular ``MappedSchema`` +which allow the configuration of a database schema or table name prefixes to avoid any clash with other ``MappedSchema``. + +.. note:: It is intended that there should be plugin support for the ``SchemaService`` to offer the version upgrading and + additional schemas as part of Cordapps, and that the active schemas be confgurable. However the present implementation + offers none of this and simply results in all versions of all schemas supported by a ``QueryableState`` being persisted. + This will change in due course. Similarly, it does not currently support configuring ``SchemaOptions`` but will do so in + the future. + +Object Relational Mapping +------------------------- + +The persisted representation of a ``QueryableState`` should be an instance of a ``PersistentState`` subclass, constructed +either by the state itself or a plugin to the ``SchemaService``. This allows the ORM layer to always associate a +``StateRef`` with a persisted representation of a ``ContractState`` and allows joining with the set of unconsumed states +in the vault. + +The ``PersistentState`` subclass should be marked up as a JPA 2.1 *Entity* with a defined table name and having +properties (in Kotlin, getters/setters in Java) annotated to map to the appropriate columns and SQL types. Additional +entities can be included to model these properties where they are more complex, for example collections, so the mapping +does not have to be *flat*. The ``MappedSchema`` must provide a list of all of the JPA entity classes for that schema in order +to initialise the ORM layer. + +Several examples of entities and mappings are provided in the codebase, including ``Cash.State`` and +``CommercialPaper.State``. For example, here's the first version of the cash schema. + +.. literalinclude:: ../../contracts/src/main/kotlin/com/r3corda/schemas/CashSchemaV1.kt + :language: kotlin \ No newline at end of file diff --git a/node/build.gradle b/node/build.gradle index 9de44530bf..532db0a56f 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -130,6 +130,10 @@ dependencies { // SQL connection pooling library compile "com.zaxxer:HikariCP:2.4.7" + // Hibernate: an object relational mapper for writing state objects to the database automatically. + compile "org.hibernate:hibernate-core:5.2.2.Final" + compile "org.hibernate:hibernate-java8:5.2.2.Final" + // Integration test helpers integrationTestCompile 'junit:junit:4.12' diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 2866e8512a..4df5fd20df 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -40,6 +40,8 @@ import com.r3corda.node.services.network.NetworkMapService.RegistrationResponse import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.network.PersistentNetworkMapService import com.r3corda.node.services.persistence.* +import com.r3corda.node.services.schema.HibernateObserver +import com.r3corda.node.services.schema.NodeSchemaService import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.SimpleNotaryService @@ -105,6 +107,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap override val identityService: IdentityService get() = identity override val schedulerService: SchedulerService get() = scheduler override val clock: Clock = platformClock + override val schemaService: SchemaService get() = schemas // Internal only override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) @@ -147,6 +150,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap lateinit var api: APIServer lateinit var scheduler: SchedulerService lateinit var protocolLogicFactory: ProtocolLogicRefFactory + lateinit var schemas: SchemaService val customServices: ArrayList = ArrayList() protected val runOnStop: ArrayList = ArrayList() lateinit var database: Database @@ -190,6 +194,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap checkpointStorage = storageServices.second netMapCache = InMemoryNetworkMapCache() net = makeMessagingService() + schemas = makeSchemaService() vault = makeVaultService() identity = makeIdentityService() @@ -237,6 +242,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap // Add vault observers CashBalanceAsMetricsObserver(services) ScheduledActivityObserver(services) + HibernateObserver(services) checkpointStorage.forEach { isPreviousCheckpointsPresent = true @@ -407,6 +413,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap // TODO: sort out ordering of open & protected modifiers of functions in this class. protected open fun makeVaultService(): VaultService = NodeVaultService(services) + protected open fun makeSchemaService(): SchemaService = NodeSchemaService() + open fun stop() { // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/SchemaService.kt b/node/src/main/kotlin/com/r3corda/node/services/api/SchemaService.kt new file mode 100644 index 0000000000..166d3a0335 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/api/SchemaService.kt @@ -0,0 +1,34 @@ +package com.r3corda.node.services.api + +import com.r3corda.core.schemas.MappedSchema +import com.r3corda.core.schemas.PersistentState +import com.r3corda.core.schemas.QueryableState + +//DOCSTART SchemaService +/** + * A configuration and customisation point for Object Relational Mapping of contract state objects. + */ +interface SchemaService { + /** + * Represents any options configured on the node for a schema. + */ + data class SchemaOptions(val databaseSchema: String?, val tablePrefix: String?) + + /** + * Options configured for this node's schemas. A missing entry for a schema implies all properties are null. + */ + val schemaOptions: Map + + /** + * Given a state, select schemas to map it to that are supported by [generateMappedObject] and that are configured + * for this node. + */ + fun selectSchemas(state: QueryableState): Iterable + + /** + * Map a state to a [PersistentState] for the given schema, either via direct support from the state + * or via custom logic in this service. + */ + fun generateMappedObject(state: QueryableState, schema: MappedSchema): PersistentState +} +//DOCEND SchemaService diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt index f087fcca94..b2e958cd2d 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt @@ -40,6 +40,7 @@ private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java) abstract class ServiceHubInternal : ServiceHub { abstract val monitoringService: MonitoringService abstract val protocolLogicRefFactory: ProtocolLogicRefFactory + abstract val schemaService: SchemaService abstract override val networkService: MessagingServiceInternal diff --git a/node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt b/node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt new file mode 100644 index 0000000000..e0848269a1 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt @@ -0,0 +1,124 @@ +package com.r3corda.node.services.schema + +import com.r3corda.core.contracts.ContractState +import com.r3corda.core.contracts.StateAndRef +import com.r3corda.core.contracts.StateRef +import com.r3corda.core.schemas.MappedSchema +import com.r3corda.core.schemas.PersistentStateRef +import com.r3corda.core.schemas.QueryableState +import com.r3corda.core.utilities.debug +import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.services.api.ServiceHubInternal +import kotlinx.support.jdk7.use +import org.hibernate.SessionFactory +import org.hibernate.boot.model.naming.Identifier +import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl +import org.hibernate.cfg.Configuration +import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider +import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment +import org.hibernate.service.UnknownUnwrapTypeException +import org.jetbrains.exposed.sql.transactions.TransactionManager +import java.sql.Connection +import java.util.concurrent.ConcurrentHashMap + +/** + * A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate. + */ +// TODO: Manage version evolution of the schemas via additional tooling. +class HibernateObserver(services: ServiceHubInternal) { + companion object { + val logger = loggerFor() + } + + val schemaService = services.schemaService + // TODO: make this a guava cache or similar to limit ability for this to grow forever. + val sessionFactories = ConcurrentHashMap() + + init { + services.vaultService.updates.subscribe { persist(it.produced) } + } + + private fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory { + return sessionFactories.computeIfAbsent(schema, { makeSessionFactoryForSchema(it) }) + } + + private fun makeSessionFactoryForSchema(schema: MappedSchema): SessionFactory { + logger.info("Creating session factory for schema $schema") + // We set a connection provider as the auto schema generation requires it. The auto schema generation will not + // necessarily remain and would likely be replaced by something like Liquibase. For now it is very convenient though. + // TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs. + val config = Configuration().setProperty("hibernate.connection.provider_class", NodeDatabaseConnectionProvider::class.java.name) + .setProperty("hibernate.hbm2ddl.auto", "update") + .setProperty("hibernate.show_sql", "true") + .setProperty("hibernate_format_sql", "true") + val options = schemaService.schemaOptions[schema] + val databaseSchema = options?.databaseSchema + if (databaseSchema != null) { + logger.debug { "Database schema = $databaseSchema" } + config.setProperty("hibernate.default_schema", databaseSchema) + } + val tablePrefix = options?.tablePrefix + if (tablePrefix != null) { + logger.debug { "Table prefix = $tablePrefix" } + config.setPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() { + override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier { + val default = super.toPhysicalTableName(name, context) + return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted) + } + }) + } + schema.mappedTypes.forEach { config.addAnnotatedClass(it) } + val sessionFactory = config.buildSessionFactory() + logger.info("Created session factory for schema $schema") + return sessionFactory + } + + private fun persist(produced: Set>) { + produced.forEach { persistState(it) } + } + + private fun persistState(stateAndRef: StateAndRef) { + val state = stateAndRef.state.data + if (state is QueryableState) { + logger.debug { "Asked to persist state ${stateAndRef.ref}" } + schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) } + } + } + + private fun persistStateWithSchema(state: QueryableState, stateRef: StateRef, schema: MappedSchema) { + val sessionFactory = sessionFactoryForSchema(schema) + val session = sessionFactory.openStatelessSession(TransactionManager.current().connection) + session.use { + val mappedObject = schemaService.generateMappedObject(state, schema) + mappedObject.stateRef = PersistentStateRef(stateRef) + session.insert(mappedObject) + } + } + + // Supply Hibernate with connections from our underlying Exposed database integration. Only used + // during schema creation / update. + class NodeDatabaseConnectionProvider : ConnectionProvider { + override fun closeConnection(conn: Connection) { + val tx = TransactionManager.current() + tx.commit() + tx.close() + } + + override fun supportsAggressiveRelease(): Boolean = true + + override fun getConnection(): Connection { + val tx = TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ) + return tx.connection + } + + override fun unwrap(unwrapType: Class?): T { + if (unwrapType == NodeDatabaseConnectionProvider::class.java) { + return unwrapType.cast(this) + } else { + throw UnknownUnwrapTypeException(unwrapType) + } + } + + override fun isUnwrappableAs(unwrapType: Class<*>?): Boolean = (unwrapType == NodeDatabaseConnectionProvider::class.java) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/com/r3corda/node/services/schema/NodeSchemaService.kt new file mode 100644 index 0000000000..bc9f747ec2 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/schema/NodeSchemaService.kt @@ -0,0 +1,29 @@ +package com.r3corda.node.services.schema + +import com.r3corda.core.schemas.MappedSchema +import com.r3corda.core.schemas.PersistentState +import com.r3corda.core.schemas.QueryableState +import com.r3corda.core.serialization.SingletonSerializeAsToken +import com.r3corda.node.services.api.SchemaService + +/** + * Most basic implementation of [SchemaService]. + * + * TODO: support loading schema options from node configuration. + * TODO: support configuring what schemas are to be selected for persistence. + * TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState]. + */ +class NodeSchemaService : SchemaService, SingletonSerializeAsToken() { + // Currently does not support configuring schema options. + override val schemaOptions: Map = emptyMap() + + // Currently returns all schemas supported by the state, with no filtering or enrichment. + override fun selectSchemas(state: QueryableState): Iterable { + return state.supportedSchemas() + } + + // Because schema is always one supported by the state, just delegate. + override fun generateMappedObject(state: QueryableState, schema: MappedSchema): PersistentState { + return state.generateMappedObject(schema) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt index 72712c2425..a94d6938e8 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt @@ -9,6 +9,8 @@ import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.Vault import com.r3corda.core.node.services.VaultService import com.r3corda.core.serialization.SingletonSerializeAsToken +import com.r3corda.core.serialization.parseAsHex +import com.r3corda.core.serialization.toHexString import com.r3corda.core.transactions.WireTransaction import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace @@ -16,7 +18,6 @@ import com.r3corda.node.utilities.AbstractJDBCHashSet import com.r3corda.node.utilities.JDBCHashedTable import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement -import org.slf4j.LoggerFactory import rx.Observable import rx.subjects.PublishSubject import java.security.PublicKey @@ -39,16 +40,16 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT } private object StatesSetTable : JDBCHashedTable("vault_unconsumed_states") { - val txhash = binary("transaction_id", 32) + val txhash = varchar("transaction_id", 64) val index = integer("output_index") } private val mutex = ThreadBox(object { val unconsumedStates = object : AbstractJDBCHashSet(StatesSetTable) { - override fun elementFromRow(it: ResultRow): StateRef = StateRef(SecureHash.SHA256(it[table.txhash]), it[table.index]) + override fun elementFromRow(it: ResultRow): StateRef = StateRef(SecureHash.SHA256(it[table.txhash].parseAsHex()), it[table.index]) override fun addElementToInsert(it: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) { - it[table.txhash] = entry.txhash.bits + it[table.txhash] = entry.txhash.bits.toHexString() it[table.index] = entry.index } } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 70414423bb..71afcfe511 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -6,7 +6,7 @@ keyStorePassword = "cordacadevpass" trustStorePassword = "trustpass" dataSourceProperties = { dataSourceClassName = org.h2.jdbcx.JdbcDataSource - "dataSource.url" = "jdbc:h2:"${basedir}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;MVCC=true;MV_STORE=true" + "dataSource.url" = "jdbc:h2:"${basedir}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;MVCC=true;MV_STORE=true;WRITE_DELAY=0" "dataSource.user" = sa "dataSource.password" = "" } diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index b633dd6c3c..76d79d4869 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -18,6 +18,7 @@ import com.r3corda.core.utilities.DUMMY_NOTARY import com.r3corda.core.utilities.DUMMY_NOTARY_KEY import com.r3corda.core.utilities.LogHelper import com.r3corda.core.utilities.TEST_TX_TIME +import com.r3corda.node.internal.AbstractNode import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.persistence.PerFileTransactionStorage @@ -88,12 +89,12 @@ class TwoPartyTradeProtocolTests { aliceNode.disableDBCloseOnStop() bobNode.disableDBCloseOnStop() - bobNode.services.fillWithSomeTestCash(2000.DOLLARS) + databaseTransaction(bobNode.database) { + bobNode.services.fillWithSomeTestCash(2000.DOLLARS) + } val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey, 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null).second - - insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey, notaryNode.storage.myLegalIdentityKey) - + insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey, notaryNode.storage.myLegalIdentityKey) val (bobPsm, aliceResult) = runBuyerAndSeller("alice's paper".outputStateAndRef()) // TODO: Verify that the result was inserted into the transaction database. @@ -126,11 +127,12 @@ class TwoPartyTradeProtocolTests { net.runNetwork() // Clear network map registration messages - bobNode.services.fillWithSomeTestCash(2000.DOLLARS) + databaseTransaction(bobNode.database) { + bobNode.services.fillWithSomeTestCash(2000.DOLLARS) + } val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey, 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, null).second - insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey) - + insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey) val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerResult // Everything is on this thread so we can now step through the protocol one step at a time. @@ -227,10 +229,10 @@ class TwoPartyTradeProtocolTests { val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray())) val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public).second - val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode.services) + val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode) val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey, 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID).second - val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey) + val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey) net.runNetwork() // Clear network map registration messages @@ -318,10 +320,10 @@ class TwoPartyTradeProtocolTests { val attachmentID = attachment(ByteArrayInputStream(stream.toByteArray())) val bobsFakeCash = fillUpForBuyer(false, bobNode.keyManagement.freshKey().public).second - insertFakeTransactions(bobsFakeCash, bobNode.services) + insertFakeTransactions(bobsFakeCash, bobNode) val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey, 1200.DOLLARS `issued by` DUMMY_CASH_ISSUER, attachmentID).second - insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey) + insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey) net.runNetwork() // Clear network map registration messages @@ -407,8 +409,8 @@ class TwoPartyTradeProtocolTests { val alicesFakePaper = fillUpForSeller(aliceError, aliceNode.storage.myLegalIdentity.owningKey, 1200.DOLLARS `issued by` issuer, null).second - insertFakeTransactions(bobsBadCash, bobNode.services, bobNode.storage.myLegalIdentityKey, bobNode.storage.myLegalIdentityKey) - insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey) + insertFakeTransactions(bobsBadCash, bobNode, bobNode.storage.myLegalIdentityKey, bobNode.storage.myLegalIdentityKey) + insertFakeTransactions(alicesFakePaper, aliceNode, aliceNode.storage.myLegalIdentityKey) net.runNetwork() // Clear network map registration messages @@ -435,15 +437,17 @@ class TwoPartyTradeProtocolTests { private fun insertFakeTransactions( wtxToSign: List, - services: ServiceHub, + node: AbstractNode, vararg extraKeys: KeyPair): Map { - val signed: List = signAll(wtxToSign, extraKeys.toList() + DUMMY_CASH_ISSUER_KEY) - services.recordTransactions(signed) - val validatedTransactions = services.storageService.validatedTransactions - if (validatedTransactions is RecordingTransactionStorage) { - validatedTransactions.records.clear() + return databaseTransaction(node.database) { + val signed: List = signAll(wtxToSign, extraKeys.toList() + DUMMY_CASH_ISSUER_KEY) + node.services.recordTransactions(signed) + val validatedTransactions = node.services.storageService.validatedTransactions + if (validatedTransactions is RecordingTransactionStorage) { + validatedTransactions.records.clear() + } + return@databaseTransaction signed.associateBy { it.id } } - return signed.associateBy { it.id } } private fun LedgerDSL.fillUpForBuyer( diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt index 657d23a79b..1f67a3726a 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServiceHubInternal.kt @@ -12,7 +12,9 @@ import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MonitoringService import com.r3corda.node.services.api.ServiceHubInternal +import com.r3corda.node.services.api.SchemaService import com.r3corda.node.services.persistence.DataVending +import com.r3corda.node.services.schema.NodeSchemaService import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.testing.MOCK_IDENTITY_SERVICE import com.r3corda.testing.node.MockNetworkMapCache @@ -31,7 +33,8 @@ open class MockServiceHubInternal( val mapCache: NetworkMapCache? = MockNetworkMapCache(), val scheduler: SchedulerService? = null, val overrideClock: Clock? = NodeClock(), - val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory() + val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory(), + val schemas: SchemaService? = NodeSchemaService() ) : ServiceHubInternal() { override val vaultService: VaultService = customVault ?: InMemoryVaultService(this) override val keyManagementService: KeyManagementService @@ -52,6 +55,8 @@ open class MockServiceHubInternal( override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val protocolLogicRefFactory: ProtocolLogicRefFactory get() = protocolFactory ?: throw UnsupportedOperationException() + override val schemaService: SchemaService + get() = schemas ?: throw UnsupportedOperationException() // We isolate the storage service with writable TXes so that it can't be accessed except via recordTransactions() private val txStorageService: TxWritableStorageService