diff --git a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt index 60816009cb..9cf24a81fc 100644 --- a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt @@ -4,13 +4,15 @@ import net.corda.core.DeleteForDJVM import net.corda.core.flows.FlowLogic import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.node.services.vault.CordaTransactionSupport import rx.Observable /** - * A [CordaService] annotated class requires a constructor taking a + * A [net.corda.core.node.services.CordaService] annotated class requires a constructor taking a * single parameter of type [AppServiceHub]. - * With the [AppServiceHub] parameter a [CordaService] is able to access to privileged operations. - * In particular such a [CordaService] can initiate and track flows marked with [net.corda.core.flows.StartableByService]. + * With the [AppServiceHub] parameter a [net.corda.core.node.services.CordaService] is able to access to privileged operations. + * In particular such a [net.corda.core.node.services.CordaService] can initiate and track flows marked + * with [net.corda.core.flows.StartableByService]. */ @DeleteForDJVM interface AppServiceHub : ServiceHub { @@ -28,4 +30,12 @@ interface AppServiceHub : ServiceHub { * TODO it is assumed here that the flow object has an appropriate classloader. */ fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle + + /** + * Accessor to [CordaTransactionSupport] in order to perform sensitive actions within new, independent top level transaction. + * + * There are times when a user thread may want to perform certain actions within a new top level DB transaction. This will be an + * independent transaction from those used in the framework. + */ + val database: CordaTransactionSupport } diff --git a/core/src/main/kotlin/net/corda/core/node/services/vault/CordaTransactionSupport.kt b/core/src/main/kotlin/net/corda/core/node/services/vault/CordaTransactionSupport.kt new file mode 100644 index 0000000000..5b371f854a --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/node/services/vault/CordaTransactionSupport.kt @@ -0,0 +1,12 @@ +package net.corda.core.node.services.vault + +import net.corda.core.DoNotImplement + +@DoNotImplement +interface CordaTransactionSupport { + /** + * Executes given statement in the scope of transaction with default transaction isolation level. + * @param statement to be executed in the scope of this transaction. + */ + fun transaction(statement: SessionScope.() -> T): T +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/services/vault/SessionScope.kt b/core/src/main/kotlin/net/corda/core/node/services/vault/SessionScope.kt new file mode 100644 index 0000000000..18bf95e0e3 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/node/services/vault/SessionScope.kt @@ -0,0 +1,12 @@ +package net.corda.core.node.services.vault + +import net.corda.core.DoNotImplement +import org.hibernate.Session + +/** + * Represents scope for the operation when JPA [Session] been created, i.e. transaction started. + */ +@DoNotImplement +interface SessionScope { + val session: Session +} \ No newline at end of file diff --git a/detekt-baseline.xml b/detekt-baseline.xml index d87a2ce4d2..d737c814fa 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -1321,7 +1321,7 @@ MaxLineLength:AbstractNode.kt$AbstractNode$throw IllegalStateException("CryptoService and signingCertificateStore are not aligned, the entry for key-alias: $alias is only found in $keyExistsIn") MaxLineLength:AbstractNode.kt$AbstractNode$val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(configuration.cordappDirectories), attachments).tokenize() MaxLineLength:AbstractNode.kt$AbstractNode$val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParametersStorage, transactionStorage).also { attachments.servicesForResolution = it } - MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$private + MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$override val database: CordaTransactionSupport MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } MaxLineLength:AbstractNode.kt$ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex) MaxLineLength:AbstractNode.kt$ex.cause is ClassNotFoundException -> throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html") @@ -3924,6 +3924,7 @@ ThrowsCount:JacksonSupport.kt$JacksonSupport.PartyDeserializer$private fun lookupByNameSegment(mapper: PartyObjectMapper, parser: JsonParser): Party ThrowsCount:JarScanningCordappLoader.kt$JarScanningCordappLoader$private fun parseVersion(versionStr: String?, attributeName: String): Int ThrowsCount:LedgerDSLInterpreter.kt$Verifies$ fun failsWith(expectedMessage: String?): EnforceVerifyOrFail + ThrowsCount:MockServices.kt$ fun <T : SerializeAsToken> createMockCordaService(serviceHub: MockServices, serviceConstructor: (AppServiceHub) -> T): T ThrowsCount:NetworkRegistrationHelper.kt$NetworkRegistrationHelper$private fun validateCertificates(registeringPublicKey: PublicKey, certificates: List<X509Certificate>) ThrowsCount:NodeInfoFilesCopier.kt$NodeInfoFilesCopier$private fun atomicCopy(source: Path, destination: Path) ThrowsCount:NodeVaultService.kt$NodeVaultService$@Throws(VaultQueryException::class) private fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class<out T>, skipPagingChecks: Boolean): Vault.Page<T> diff --git a/docs/source/api-service-classes.rst b/docs/source/api-service-classes.rst index e9e7098c90..2d87b19790 100644 --- a/docs/source/api-service-classes.rst +++ b/docs/source/api-service-classes.rst @@ -56,6 +56,9 @@ Below is an empty implementation of a Service class: The ``AppServiceHub`` provides the ``ServiceHub`` functionality to the Service class, with the extra ability to start flows. Starting flows from ``AppServiceHub`` is explained further in :ref:`Starting Flows from a Service `. +The ``AppServiceHub`` also provides access to ``database`` which will enable the Service class to perform DB transactions from the threads +managed by the Service. + Code can be run during node startup when the class is being initialised. To do so, place the code into the ``init`` block or constructor. This is useful when a service needs to establish a connection to an external database or setup observables via ``ServiceHub.trackBy`` during its startup. These can then persist during the service's lifetime. diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index f8504e219a..f80cd39266 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,6 +7,9 @@ release, see :doc:`app-upgrade-notes`. Unreleased ---------- +* ``AppServiceHub`` been extended to provide access to ``database`` which will enable the Service class to perform DB transactions + from the threads managed by the custom Service. + * Moved and renamed the testing web server to the ``testing`` subproject. Also renamed the published artifact to ``corda-testserver.jar``. * New Vault Query criteria to specify exact matches for specified participants. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaTransactionSupportImpl.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaTransactionSupportImpl.kt new file mode 100644 index 0000000000..2ab8d144d5 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaTransactionSupportImpl.kt @@ -0,0 +1,21 @@ +package net.corda.nodeapi.internal.persistence + +import net.corda.core.node.services.vault.CordaTransactionSupport +import net.corda.core.node.services.vault.SessionScope + +/** + * Helper class that wraps [CordaPersistence] and limits operations on it down to methods exposed by [CordaTransactionSupport]. + */ +class CordaTransactionSupportImpl(private val persistence: CordaPersistence) : CordaTransactionSupport { + override fun transaction(statement: SessionScope.() -> T): T { + // An alternative approach could be to make `DatabaseTransaction` extend from `SessionScope`, but this will introduce a hierarchical + // dependency which might be unwanted in some cases. + fun DatabaseTransaction.innerFunc(): T { + return statement.invoke( + object : SessionScope { + override val session = this@innerFunc.session + }) + } + return persistence.transaction(0, DatabaseTransaction::innerFunc) + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt new file mode 100644 index 0000000000..298fc59c21 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt @@ -0,0 +1,65 @@ +package net.corda.node.services.persistence + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.getOrThrow +import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Test +import java.sql.DriverManager +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class CordaPersistenceServiceTests { + @Test + fun `corda service can save many transactions from different threads`() { + driver(DriverParameters(inMemoryDB = false, startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { + + val port = incrementalPortAllocation().nextPort() + val node = startNode(customOverrides = mapOf("h2Settings.address" to "localhost:$port")).getOrThrow() + + val sampleSize = 100 + val count = node.rpc.startFlow(::MyRpcFlow, sampleSize).returnValue.getOrThrow() + assertEquals(sampleSize, count) + + DriverManager.getConnection("jdbc:h2:tcp://localhost:$port/node", "sa", "").use { + val resultSet = it.createStatement().executeQuery("SELECT count(*) from ${NODE_DATABASE_PREFIX}checkpoints") + assertTrue(resultSet.next()) + val resultSize = resultSet.getInt(1) + assertEquals(sampleSize, resultSize) + } + } + } + + @StartableByRPC + class MyRpcFlow(private val count: Int) : FlowLogic() { + @Suspendable + override fun call(): Int { + val service = serviceHub.cordaService(MultiThreadedDbLoader::class.java) + return service.createObjects(count) + } + } + + @CordaService + class MultiThreadedDbLoader(private val services: AppServiceHub) : SingletonSerializeAsToken() { + fun createObjects(count: Int) : Int { + (1..count).toList().parallelStream().forEach { + services.database.transaction { + session.save(DBCheckpointStorage.DBCheckpoint().apply { + checkpointId = it.toString() + }) + } + } + + return count + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 8ff4a93093..90fb26e3dc 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -56,6 +56,7 @@ import net.corda.core.node.services.CordaService import net.corda.core.node.services.IdentityService import net.corda.core.node.services.KeyManagementService import net.corda.core.node.services.TransactionVerifierService +import net.corda.core.node.services.vault.CordaTransactionSupport import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializeAsToken @@ -154,6 +155,7 @@ import net.corda.nodeapi.internal.crypto.X509Utilities.NODE_IDENTITY_ALIAS_PREFI import net.corda.nodeapi.internal.cryptoservice.CryptoServiceFactory import net.corda.nodeapi.internal.cryptoservice.SupportedCryptoServices import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService +import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -741,7 +743,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, * This customizes the ServiceHub for each CordaService that is initiating flows. */ // TODO Move this into its own file - private class AppServiceHubImpl(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter) : AppServiceHub, ServiceHub by serviceHub { + private class AppServiceHubImpl(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter, + override val database: CordaTransactionSupport) : AppServiceHub, ServiceHub by serviceHub { lateinit var serviceInstance: T override fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle { val stateMachine = startFlowChecked(flow) @@ -789,7 +792,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, serviceClass.requireAnnotation() val service = try { - val serviceContext = AppServiceHubImpl(services, flowStarter) + val serviceContext = AppServiceHubImpl(services, flowStarter, CordaTransactionSupportImpl(database)) val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true } val service = extendedServiceConstructor.newInstance(serviceContext) serviceContext.serviceInstance = service diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index dde4af7310..d057466606 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -18,6 +18,7 @@ import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.StateMachineTransactionMapping import net.corda.core.node.* import net.corda.core.node.services.* +import net.corda.core.node.services.vault.CordaTransactionSupport import net.corda.core.serialization.SerializeAsToken import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NetworkHostAndPort @@ -488,6 +489,9 @@ fun createMockCordaService(serviceHub: MockServices, serv override fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle { throw UnsupportedOperationException() } + + override val database: CordaTransactionSupport + get() = throw UnsupportedOperationException() } return MockAppServiceHubImpl(serviceHub, serviceConstructor).serviceInstance }