CORDA-3471: Create CordaTransactionSupport and make it accessible through AppServiceHub (#5768)

* CORDA-3471: Create `CordaTransactionSupport` and use wherever possible instead of `CordaPersistence`

* CORDA-3471: Address comments by @mnesbit
- Relocate `CordaTransactionSupport` to `core`
- Create a lighter version of transaction - `VaultTransaction` that gives access to `session` object only.

* CORDA-3471: More changes after discussion with  @mnesbit
- Rename `VaultTransaction` into `SessionScope`.

* CORDA-3471: Revert changes to most of the files after conversation with @mnesbit and @rick-r3

* CORDA-3471: Introduce `CordaTransactionSupportImpl` and make it accessible via `AppServiceHub`.

* CORDA-3471: Minor change (comment).

* CORDA-3471: Address input from @mnesbit

* CORDA-3471: Address input from @rick-r3

* CORDA-3471: Make Detekt happier

* CORDA-3471: Add a new test that proves transactions can be started from client threads

As requested by @mnesbit

* CORDA-3471: Change log and documentation update.

As requested by @mnesbit
This commit is contained in:
Viktor Kolomeyko 2019-12-04 17:18:40 +00:00 committed by Rick Parker
parent 5a41ec9b82
commit 43205e1f1a
10 changed files with 140 additions and 6 deletions

View File

@ -4,13 +4,15 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.node.services.vault.CordaTransactionSupport
import rx.Observable
/**
* A [CordaService] annotated class requires a constructor taking a
* A [net.corda.core.node.services.CordaService] annotated class requires a constructor taking a
* single parameter of type [AppServiceHub].
* With the [AppServiceHub] parameter a [CordaService] is able to access to privileged operations.
* In particular such a [CordaService] can initiate and track flows marked with [net.corda.core.flows.StartableByService].
* With the [AppServiceHub] parameter a [net.corda.core.node.services.CordaService] is able to access to privileged operations.
* In particular such a [net.corda.core.node.services.CordaService] can initiate and track flows marked
* with [net.corda.core.flows.StartableByService].
*/
@DeleteForDJVM
interface AppServiceHub : ServiceHub {
@ -28,4 +30,12 @@ interface AppServiceHub : ServiceHub {
* TODO it is assumed here that the flow object has an appropriate classloader.
*/
fun <T> startTrackedFlow(flow: FlowLogic<T>): FlowProgressHandle<T>
/**
* Accessor to [CordaTransactionSupport] in order to perform sensitive actions within new, independent top level transaction.
*
* There are times when a user thread may want to perform certain actions within a new top level DB transaction. This will be an
* independent transaction from those used in the framework.
*/
val database: CordaTransactionSupport
}

View File

@ -0,0 +1,12 @@
package net.corda.core.node.services.vault
import net.corda.core.DoNotImplement
@DoNotImplement
interface CordaTransactionSupport {
/**
* Executes given statement in the scope of transaction with default transaction isolation level.
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(statement: SessionScope.() -> T): T
}

View File

@ -0,0 +1,12 @@
package net.corda.core.node.services.vault
import net.corda.core.DoNotImplement
import org.hibernate.Session
/**
* Represents scope for the operation when JPA [Session] been created, i.e. transaction started.
*/
@DoNotImplement
interface SessionScope {
val session: Session
}

View File

@ -1321,7 +1321,7 @@
<ID>MaxLineLength:AbstractNode.kt$AbstractNode$throw IllegalStateException("CryptoService and signingCertificateStore are not aligned, the entry for key-alias: $alias is only found in $keyExistsIn")</ID>
<ID>MaxLineLength:AbstractNode.kt$AbstractNode$val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(configuration.cordappDirectories), attachments).tokenize()</ID>
<ID>MaxLineLength:AbstractNode.kt$AbstractNode$val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParametersStorage, transactionStorage).also { attachments.servicesForResolution = it }</ID>
<ID>MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$private</ID>
<ID>MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$override val database: CordaTransactionSupport</ID>
<ID>MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" }</ID>
<ID>MaxLineLength:AbstractNode.kt$ex is HikariPool.PoolInitializationException -&gt; throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex)</ID>
<ID>MaxLineLength:AbstractNode.kt$ex.cause is ClassNotFoundException -&gt; throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html")</ID>
@ -3924,6 +3924,7 @@
<ID>ThrowsCount:JacksonSupport.kt$JacksonSupport.PartyDeserializer$private fun lookupByNameSegment(mapper: PartyObjectMapper, parser: JsonParser): Party</ID>
<ID>ThrowsCount:JarScanningCordappLoader.kt$JarScanningCordappLoader$private fun parseVersion(versionStr: String?, attributeName: String): Int</ID>
<ID>ThrowsCount:LedgerDSLInterpreter.kt$Verifies$ fun failsWith(expectedMessage: String?): EnforceVerifyOrFail</ID>
<ID>ThrowsCount:MockServices.kt$ fun &lt;T : SerializeAsToken&gt; createMockCordaService(serviceHub: MockServices, serviceConstructor: (AppServiceHub) -&gt; T): T</ID>
<ID>ThrowsCount:NetworkRegistrationHelper.kt$NetworkRegistrationHelper$private fun validateCertificates(registeringPublicKey: PublicKey, certificates: List&lt;X509Certificate&gt;)</ID>
<ID>ThrowsCount:NodeInfoFilesCopier.kt$NodeInfoFilesCopier$private fun atomicCopy(source: Path, destination: Path)</ID>
<ID>ThrowsCount:NodeVaultService.kt$NodeVaultService$@Throws(VaultQueryException::class) private fun &lt;T : ContractState&gt; _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class&lt;out T&gt;, skipPagingChecks: Boolean): Vault.Page&lt;T&gt;</ID>

View File

@ -56,6 +56,9 @@ Below is an empty implementation of a Service class:
The ``AppServiceHub`` provides the ``ServiceHub`` functionality to the Service class, with the extra ability to start flows. Starting flows
from ``AppServiceHub`` is explained further in :ref:`Starting Flows from a Service <starting_flows_from_a_service>`.
The ``AppServiceHub`` also provides access to ``database`` which will enable the Service class to perform DB transactions from the threads
managed by the Service.
Code can be run during node startup when the class is being initialised. To do so, place the code into the ``init`` block or constructor.
This is useful when a service needs to establish a connection to an external database or setup observables via ``ServiceHub.trackBy`` during
its startup. These can then persist during the service's lifetime.

View File

@ -7,6 +7,9 @@ release, see :doc:`app-upgrade-notes`.
Unreleased
----------
* ``AppServiceHub`` been extended to provide access to ``database`` which will enable the Service class to perform DB transactions
from the threads managed by the custom Service.
* Moved and renamed the testing web server to the ``testing`` subproject. Also renamed the published artifact to ``corda-testserver.jar``.
* New Vault Query criteria to specify exact matches for specified participants.

View File

@ -0,0 +1,21 @@
package net.corda.nodeapi.internal.persistence
import net.corda.core.node.services.vault.CordaTransactionSupport
import net.corda.core.node.services.vault.SessionScope
/**
* Helper class that wraps [CordaPersistence] and limits operations on it down to methods exposed by [CordaTransactionSupport].
*/
class CordaTransactionSupportImpl(private val persistence: CordaPersistence) : CordaTransactionSupport {
override fun <T> transaction(statement: SessionScope.() -> T): T {
// An alternative approach could be to make `DatabaseTransaction` extend from `SessionScope`, but this will introduce a hierarchical
// dependency which might be unwanted in some cases.
fun DatabaseTransaction.innerFunc(): T {
return statement.invoke(
object : SessionScope {
override val session = this@innerFunc.session
})
}
return persistence.transaction(0, DatabaseTransaction::innerFunc)
}
}

View File

@ -0,0 +1,65 @@
package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import java.sql.DriverManager
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class CordaPersistenceServiceTests {
@Test
fun `corda service can save many transactions from different threads`() {
driver(DriverParameters(inMemoryDB = false, startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val port = incrementalPortAllocation().nextPort()
val node = startNode(customOverrides = mapOf("h2Settings.address" to "localhost:$port")).getOrThrow()
val sampleSize = 100
val count = node.rpc.startFlow(::MyRpcFlow, sampleSize).returnValue.getOrThrow()
assertEquals(sampleSize, count)
DriverManager.getConnection("jdbc:h2:tcp://localhost:$port/node", "sa", "").use {
val resultSet = it.createStatement().executeQuery("SELECT count(*) from ${NODE_DATABASE_PREFIX}checkpoints")
assertTrue(resultSet.next())
val resultSize = resultSet.getInt(1)
assertEquals(sampleSize, resultSize)
}
}
}
@StartableByRPC
class MyRpcFlow(private val count: Int) : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
val service = serviceHub.cordaService(MultiThreadedDbLoader::class.java)
return service.createObjects(count)
}
}
@CordaService
class MultiThreadedDbLoader(private val services: AppServiceHub) : SingletonSerializeAsToken() {
fun createObjects(count: Int) : Int {
(1..count).toList().parallelStream().forEach {
services.database.transaction {
session.save(DBCheckpointStorage.DBCheckpoint().apply {
checkpointId = it.toString()
})
}
}
return count
}
}
}

View File

@ -56,6 +56,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.node.services.vault.CordaTransactionSupport
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
@ -154,6 +155,7 @@ import net.corda.nodeapi.internal.crypto.X509Utilities.NODE_IDENTITY_ALIAS_PREFI
import net.corda.nodeapi.internal.cryptoservice.CryptoServiceFactory
import net.corda.nodeapi.internal.cryptoservice.SupportedCryptoServices
import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService
import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -741,7 +743,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
* This customizes the ServiceHub for each CordaService that is initiating flows.
*/
// TODO Move this into its own file
private class AppServiceHubImpl<T : SerializeAsToken>(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter) : AppServiceHub, ServiceHub by serviceHub {
private class AppServiceHubImpl<T : SerializeAsToken>(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter,
override val database: CordaTransactionSupport) : AppServiceHub, ServiceHub by serviceHub {
lateinit var serviceInstance: T
override fun <T> startTrackedFlow(flow: FlowLogic<T>): FlowProgressHandle<T> {
val stateMachine = startFlowChecked(flow)
@ -789,7 +792,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
serviceClass.requireAnnotation<CordaService>()
val service = try {
val serviceContext = AppServiceHubImpl<T>(services, flowStarter)
val serviceContext = AppServiceHubImpl<T>(services, flowStarter, CordaTransactionSupportImpl(database))
val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true }
val service = extendedServiceConstructor.newInstance(serviceContext)
serviceContext.serviceInstance = service

View File

@ -18,6 +18,7 @@ import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.vault.CordaTransactionSupport
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
@ -488,6 +489,9 @@ fun <T : SerializeAsToken> createMockCordaService(serviceHub: MockServices, serv
override fun <T> startTrackedFlow(flow: FlowLogic<T>): FlowProgressHandle<T> {
throw UnsupportedOperationException()
}
override val database: CordaTransactionSupport
get() = throw UnsupportedOperationException()
}
return MockAppServiceHubImpl(serviceHub, serviceConstructor).serviceInstance
}