Expose a JDBC connection (session) via the ServiceHub for generic JDB… (#1000)

* Expose a JDBC connection (session) via the ServiceHub for generic JDBC usage.

* Updated documentation.

* Fix failing JUnit following rebase from master.

* JDBC session Tutorial WIP

* Fix broken JUnits following update to consumeCash() using observable event update as return.

* Fixed broken JUnits.

* Refactoring createSession() into new CordaPersistence class following rebase from master.

* Updated fully working example.

* Minor updates following feedback from MH.

* Fixed compiler error following rebase.

* Fixes following rebase from master.

* Further updates and clarifications to documentation.
This commit is contained in:
josecoll 2017-08-04 09:26:27 +01:00 committed by GitHub
parent 818cbce789
commit 014387162d
12 changed files with 368 additions and 38 deletions

View File

@ -7,6 +7,7 @@ import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import java.security.PublicKey
import java.sql.Connection
import java.time.Clock
/**
@ -215,4 +216,15 @@ interface ServiceHub : ServicesForResolution {
* @return A new [SignedTransaction] with the addition of the new signature.
*/
fun addSignature(signedTransaction: SignedTransaction): SignedTransaction = addSignature(signedTransaction, legalIdentityKey)
/**
* Exposes a JDBC connection (session) object using the currently configured database.
* Applications can use this to execute arbitrary SQL queries (native, direct, prepared, callable)
* against its Node database tables (including custom contract tables defined by extending [Queryable]).
* When used within a flow, this session automatically forms part of the enclosing flow transaction boundary,
* and thus queryable data will include everything committed as of the last checkpoint.
* @throws IllegalStateException if called outside of a transaction.
* @return A new [Connection]
*/
fun jdbcSession(): Connection
}

View File

@ -92,3 +92,32 @@ Several examples of entities and mappings are provided in the codebase, includin
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/schemas/CashSchemaV1.kt
:language: kotlin
JDBC session
------------
Apps may also interact directly with the underlying Node's database by using a standard
JDBC connection (session) as described by the `Java SQL Connection API <https://docs.oracle.com/javase/8/docs/api/java/sql/Connection.html>`_
Use the ``ServiceHub`` ``jdbcSession`` function to obtain a JDBC connection as illustrated in the following example:
.. literalinclude:: ../../node/src/test/kotlin/net/corda/node/services/database/HibernateConfigurationTest.kt
:language: kotlin
:start-after: DOCSTART JdbcSession
:end-before: DOCEND JdbcSession
JDBC session's can be used in Flows and Service Plugins (see ":doc:`flow-state-machines`")
The following example illustrates the creation of a custom corda service using a jdbcSession:
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/CustomVaultQuery.kt
:language: kotlin
:start-after: DOCSTART CustomVaultQuery
:end-before: DOCEND CustomVaultQuery
which is then referenced within a custom flow:
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/CustomVaultQuery.kt
:language: kotlin
:start-after: DOCSTART TopupIssuer
:end-before: DOCEND TopupIssuer

View File

@ -0,0 +1,150 @@
package net.corda.docs
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap
import net.corda.flows.*
import java.util.*
// DOCSTART CustomVaultQuery
object CustomVaultQuery {
@CordaService
class Service(val services: PluginServiceHub) : SingletonSerializeAsToken() {
private companion object {
val log = loggerFor<Service>()
}
fun rebalanceCurrencyReserves(): List<Amount<Currency>> {
val nativeQuery = """
select
cashschema.ccy_code,
sum(cashschema.pennies)
from
vault_states vaultschema
join
contract_cash_states cashschema
where
vaultschema.output_index=cashschema.output_index
and vaultschema.transaction_id=cashschema.transaction_id
and vaultschema.state_status=0
group by
cashschema.ccy_code
order by
sum(cashschema.pennies) desc
"""
log.info("SQL to execute: $nativeQuery")
val session = services.jdbcSession()
val prepStatement = session.prepareStatement(nativeQuery)
val rs = prepStatement.executeQuery()
var topUpLimits: MutableList<Amount<Currency>> = mutableListOf()
while (rs.next()) {
val currencyStr = rs.getString(1)
val amount = rs.getLong(2)
log.info("$currencyStr : $amount")
topUpLimits.add(Amount(amount, Currency.getInstance(currencyStr)))
}
return topUpLimits
}
}
}
// DOCEND CustomVaultQuery
/**
* This is a slightly modified version of the IssuerFlow, which uses a 3rd party custom query to
* retrieve a list of currencies and top up amounts to be used in the issuance.
*/
object TopupIssuerFlow {
@CordaSerializable
data class TopupRequest(val issueToParty: Party,
val issuerPartyRef: OpaqueBytes,
val notaryParty: Party)
@InitiatingFlow
@StartableByRPC
class TopupIssuanceRequester(val issueToParty: Party,
val issueToPartyRef: OpaqueBytes,
val issuerBankParty: Party,
val notaryParty: Party) : FlowLogic<List<AbstractCashFlow.Result>>() {
@Suspendable
@Throws(CashException::class)
override fun call(): List<AbstractCashFlow.Result> {
val topupRequest = TopupRequest(issueToParty, issueToPartyRef, notaryParty)
return sendAndReceive<List<AbstractCashFlow.Result>>(issuerBankParty, topupRequest).unwrap { it }
}
}
@InitiatedBy(TopupIssuanceRequester::class)
class TopupIssuer(val otherParty: Party) : FlowLogic<List<SignedTransaction>>() {
companion object {
object AWAITING_REQUEST : ProgressTracker.Step("Awaiting issuance request")
object ISSUING : ProgressTracker.Step("Issuing asset")
object TRANSFERRING : ProgressTracker.Step("Transferring asset to issuance requester")
object SENDING_TOP_UP_ISSUE_REQUEST : ProgressTracker.Step("Requesting asset issue top up")
fun tracker() = ProgressTracker(AWAITING_REQUEST, ISSUING, TRANSFERRING, SENDING_TOP_UP_ISSUE_REQUEST)
}
override val progressTracker: ProgressTracker = tracker()
// DOCSTART TopupIssuer
@Suspendable
@Throws(CashException::class)
override fun call(): List<SignedTransaction> {
progressTracker.currentStep = AWAITING_REQUEST
val topupRequest = receive<TopupRequest>(otherParty).unwrap {
it
}
val customVaultQueryService = serviceHub.cordaService(CustomVaultQuery.Service::class.java)
val reserveLimits = customVaultQueryService.rebalanceCurrencyReserves()
val txns: List<SignedTransaction> = reserveLimits.map { amount ->
// request asset issue
logger.info("Requesting currency issue $amount")
val txn = issueCashTo(amount, topupRequest.issueToParty, topupRequest.issuerPartyRef)
progressTracker.currentStep = SENDING_TOP_UP_ISSUE_REQUEST
return@map txn.stx
}
send(otherParty, txns)
return txns
}
// DOCEND TopupIssuer
@Suspendable
private fun issueCashTo(amount: Amount<Currency>,
issueTo: Party,
issuerPartyRef: OpaqueBytes): AbstractCashFlow.Result {
// TODO: pass notary in as request parameter
val notaryParty = serviceHub.networkMapCache.notaryNodes[0].notaryIdentity
// invoke Cash subflow to issue Asset
progressTracker.currentStep = ISSUING
val issueRecipient = serviceHub.myInfo.legalIdentity
val issueCashFlow = CashIssueFlow(amount, issuerPartyRef, issueRecipient, notaryParty, anonymous = false)
val issueTx = subFlow(issueCashFlow)
// NOTE: issueCashFlow performs a Broadcast (which stores a local copy of the txn to the ledger)
// short-circuit when issuing to self
if (issueTo == serviceHub.myInfo.legalIdentity)
return issueTx
// now invoke Cash subflow to Move issued assetType to issue requester
progressTracker.currentStep = TRANSFERRING
val moveCashFlow = CashPaymentFlow(amount, issueTo, anonymous = false)
val moveTx = subFlow(moveCashFlow)
// NOTE: CashFlow PayCash calls FinalityFlow which performs a Broadcast (which stores a local copy of the txn to the ledger)
return moveTx
}
}
}

View File

@ -0,0 +1,100 @@
package net.corda.docs
import net.corda.contracts.getCashBalances
import net.corda.core.contracts.*
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.OpaqueBytes
import net.corda.flows.CashIssueFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_NOTARY_KEY
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import java.util.*
class CustomVaultQueryTest {
lateinit var mockNet: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
lateinit var nodeA: MockNetwork.MockNode
lateinit var nodeB: MockNetwork.MockNode
@Before
fun setup() {
mockNet = MockNetwork(threadPerNode = true)
val notaryService = ServiceInfo(ValidatingNotaryService.type)
notaryNode = mockNet.createNode(
legalName = DUMMY_NOTARY.name,
overrideServices = mapOf(notaryService to DUMMY_NOTARY_KEY),
advertisedServices = *arrayOf(ServiceInfo(NetworkMapService.type), notaryService))
nodeA = mockNet.createPartyNode(notaryNode.network.myAddress)
nodeB = mockNet.createPartyNode(notaryNode.network.myAddress)
nodeA.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java)
nodeA.installCordaService(CustomVaultQuery.Service::class.java)
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `test custom vault query`() {
// issue some cash in several currencies
issueCashForCurrency(POUNDS(1000))
issueCashForCurrency(DOLLARS(900))
issueCashForCurrency(SWISS_FRANCS(800))
val (cashBalancesOriginal, _) = getBalances()
// top up all currencies (by double original)
topUpCurrencies()
val (cashBalancesAfterTopup, _) = getBalances()
Assert.assertEquals(cashBalancesOriginal[GBP]?.times(2), cashBalancesAfterTopup[GBP])
Assert.assertEquals(cashBalancesOriginal[USD]?.times(2) , cashBalancesAfterTopup[USD])
Assert.assertEquals(cashBalancesOriginal[CHF]?.times( 2), cashBalancesAfterTopup[CHF])
}
private fun issueCashForCurrency(amountToIssue: Amount<Currency>) {
// Use NodeA as issuer and create some dollars
val flowHandle1 = nodeA.services.startFlow(CashIssueFlow(amountToIssue,
OpaqueBytes.of(0x01),
nodeA.info.legalIdentity,
notaryNode.info.notaryIdentity,
false))
// Wait for the flow to stop and print
flowHandle1.resultFuture.getOrThrow()
}
private fun topUpCurrencies() {
val flowHandle1 = nodeA.services.startFlow(TopupIssuerFlow.TopupIssuanceRequester(
nodeA.info.legalIdentity,
OpaqueBytes.of(0x01),
nodeA.info.legalIdentity,
notaryNode.info.notaryIdentity))
flowHandle1.resultFuture.getOrThrow()
}
private fun getBalances(): Pair<Map<Currency, Amount<Currency>>, Map<Currency, Amount<Currency>>> {
// Print out the balances
val balancesNodesA =
nodeA.database.transaction {
nodeA.services.getCashBalances()
}
println("BalanceA\n" + balancesNodesA)
val balancesNodesB =
nodeB.database.transaction {
nodeB.services.getCashBalances()
}
println("BalanceB\n" + balancesNodesB)
return Pair(balancesNodesA, balancesNodesB)
}
}

View File

@ -62,6 +62,8 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.utilities.*
import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
import org.apache.activemq.artemis.utils.ReusableLatch
import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger
@ -77,6 +79,7 @@ import java.security.KeyPair
import java.security.KeyStoreException
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.sql.Connection
import java.time.Clock
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@ -814,6 +817,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
super.recordTransactions(txs)
}
}
override fun jdbcSession(): Connection = database.createSession()
}
}

View File

@ -35,6 +35,13 @@ class CordaPersistence(var dataSource: HikariDataSource, databaseProperties: Pro
return DatabaseTransactionManager.currentOrNew(transactionIsolationLevel)
}
fun createSession(): Connection {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
DatabaseTransactionManager.dataSource = this
val ctx = DatabaseTransactionManager.currentOrNull()
return ctx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.")
}
fun <T> isolatedTransaction(block: DatabaseTransaction.() -> T): T {
val context = DatabaseTransactionManager.setThreadLocalTx(null)
return try {

View File

@ -49,36 +49,34 @@ public class VaultQueryJavaTests extends TestDependencyInjectionBase {
public void setUp() {
Properties dataSourceProps = makeTestDataSourceProperties(SecureHash.randomSHA256().toString());
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties());
Set<MappedSchema> customSchemas = new HashSet<>(Collections.singletonList(DummyLinearStateSchemaV1.INSTANCE));
HibernateConfiguration hibernateConfig = new HibernateConfiguration(new NodeSchemaService(customSchemas), makeTestDatabaseProperties());
database.transaction(
statement -> { services = new MockServices(getMEGA_CORP_KEY()) {
database.transaction(statement -> {
Set<MappedSchema> customSchemas = new HashSet<>(Collections.singletonList(DummyLinearStateSchemaV1.INSTANCE));
HibernateConfiguration hibernateConfig = new HibernateConfiguration(new NodeSchemaService(customSchemas), makeTestDatabaseProperties());
services = new MockServices(getMEGA_CORP_KEY()) {
@NotNull
@Override
public VaultService getVaultService() {
if (vaultSvc != null) return vaultSvc;
return makeVaultService(dataSourceProps, hibernateConfig);
}
@NotNull
@Override
public VaultQueryService getVaultQueryService() {
return new HibernateVaultQueryImpl(hibernateConfig, getVaultService().getUpdatesPublisher());
}
@NotNull
@Override
public VaultQueryService getVaultQueryService() {
return new HibernateVaultQueryImpl(hibernateConfig, vaultSvc.getUpdatesPublisher());
}
@Override
public void recordTransactions(@NotNull Iterable<SignedTransaction> txs) {
for (SignedTransaction stx : txs) {
getValidatedTransactions().addTransaction(stx);
}
Stream<WireTransaction> wtxn = StreamSupport.stream(txs.spliterator(), false).map(SignedTransaction::getTx);
getVaultService().notifyAll(wtxn.collect(Collectors.toList()));
}
};
@Override
public void recordTransactions(@NotNull Iterable<SignedTransaction> txs) {
for (SignedTransaction stx : txs) {
getValidatedTransactions().addTransaction(stx);
}
Stream<WireTransaction> wtxn = StreamSupport.stream(txs.spliterator(), false).map(SignedTransaction::getTx);
vaultSvc.notifyAll(wtxn.collect(Collectors.toList()));
}
};
vaultSvc = services.getVaultService();
vaultQuerySvc = services.getVaultQueryService();
return services;
});
}

View File

@ -21,6 +21,7 @@ import net.corda.testing.node.MockAttachmentStorage
import net.corda.testing.node.MockNetworkMapCache
import net.corda.testing.node.MockStateMachineRecordedTransactionMappingStorage
import net.corda.testing.node.MockTransactionStorage
import java.sql.Connection
import java.time.Clock
open class MockServiceHubInternal(
@ -77,4 +78,6 @@ open class MockServiceHubInternal(
}
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? = null
override fun jdbcSession(): Connection = database.createSession()
}

View File

@ -4,6 +4,7 @@ import net.corda.contracts.asset.Cash
import net.corda.contracts.asset.DUMMY_CASH_ISSUER
import net.corda.contracts.asset.DummyFungibleContract
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toBase58String
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
@ -34,6 +35,7 @@ import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.hibernate.SessionFactory
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import java.time.Instant
@ -64,17 +66,10 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
val defaultDatabaseProperties = makeTestDatabaseProperties()
database = configureDatabase(dataSourceProps, defaultDatabaseProperties)
val customSchemas = setOf(VaultSchemaV1, CashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3)
database.transaction {
hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), makeTestDatabaseProperties())
services = object : MockServices(BOB_KEY) {
override val vaultService: VaultService get() {
val vaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
return vaultService
}
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
@ -83,7 +78,9 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
}
override fun jdbcSession() = database.createSession()
}
hibernatePersister = services.hibernatePersister
}
setUpDb()
@ -852,4 +849,26 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
assertThat(queryResults).hasSize(6)
}
/**
* Test invoking SQL query using JDBC connection (session)
*/
@Test
fun `test calling an arbitrary JDBC native query`() {
// DOCSTART JdbcSession
val nativeQuery = "SELECT v.transaction_id, v.output_index FROM vault_states v WHERE v.state_status = 0"
database.transaction {
val jdbcSession = database.createSession()
val prepStatement = jdbcSession.prepareStatement(nativeQuery)
val rs = prepStatement.executeQuery()
// DOCEND JdbcSession
var count = 0
while (rs.next()) {
val stateRef = StateRef(SecureHash.parse(rs.getString(1)), rs.getInt(2))
Assert.assertTrue(cashStates.map { it.ref }.contains(stateRef))
count++
}
Assert.assertEquals(cashStates.count(), count)
}
}
}

View File

@ -243,8 +243,9 @@ class VaultQueryTests : TestDependencyInjectionBase() {
stateRefs.addAll(issuedStateRefs)
val spentStates = services.consumeCash(25.DOLLARS)
var spentStateRefs = spentStates.states.map { it.ref }.toList()
stateRefs.addAll(spentStateRefs)
var consumedStateRefs = spentStates.consumed.map { it.ref }.toList()
var producedStateRefs = spentStates.produced.map { it.ref }.toList()
stateRefs.addAll(consumedStateRefs.plus(producedStateRefs))
val sortAttribute = SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF)
val criteria = VaultQueryCriteria()

View File

@ -6,11 +6,13 @@ import net.corda.contracts.Commodity
import net.corda.contracts.DealState
import net.corda.contracts.asset.*
import net.corda.core.contracts.*
import net.corda.core.getOrThrow
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.Vault
import net.corda.core.toFuture
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
@ -19,6 +21,7 @@ import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_NOTARY_KEY
import java.security.KeyPair
import java.security.PublicKey
import java.time.Duration
import java.time.Instant
import java.time.Instant.now
import java.util.*
@ -223,7 +226,9 @@ fun ServiceHub.evolveLinearStates(linearStates: List<StateAndRef<LinearState>>)
fun ServiceHub.evolveLinearState(linearState: StateAndRef<LinearState>) : StateAndRef<LinearState> = consumeAndProduce(linearState)
@JvmOverloads
fun ServiceHub.consumeCash(amount: Amount<Currency>, to: Party = CHARLIE): Vault<Cash.State> {
fun ServiceHub.consumeCash(amount: Amount<Currency>, to: Party = CHARLIE): Vault.Update<ContractState> {
val update = vaultService.rawUpdates.toFuture()
// A tx that spends our money.
val spendTX = TransactionBuilder(DUMMY_NOTARY).apply {
vaultService.generateSpend(this, amount, to)
@ -232,9 +237,5 @@ fun ServiceHub.consumeCash(amount: Amount<Currency>, to: Party = CHARLIE): Vault
recordTransactions(spendTX)
// Get all the StateRefs of all the generated transactions.
val states = spendTX.tx.outputs.indices.map { i -> spendTX.tx.outRef<Cash.State>(i) }
return Vault(states)
return update.getOrThrow(Duration.ofSeconds(3))
}

View File

@ -39,6 +39,7 @@ import java.io.InputStream
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
import java.sql.Connection
import java.time.Clock
import java.util.*
import java.util.jar.JarInputStream
@ -51,6 +52,7 @@ import java.util.jar.JarInputStream
* building chains of transactions and verifying them. It isn't sufficient for testing flows however.
*/
open class MockServices(vararg val keys: KeyPair) : ServiceHub {
constructor() : this(generateKeyPair())
val key: KeyPair get() = keys.first()
@ -80,13 +82,17 @@ open class MockServices(vararg val keys: KeyPair) : ServiceHub {
}
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
lateinit var hibernatePersister: HibernateObserver
fun makeVaultService(dataSourceProps: Properties, hibernateConfig: HibernateConfiguration = HibernateConfiguration(NodeSchemaService(), makeTestDatabaseProperties())): VaultService {
val vaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
HibernateObserver(vaultService.rawUpdates, hibernateConfig)
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
return vaultService
}
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T = throw IllegalArgumentException("${type.name} not found")
override fun jdbcSession(): Connection = throw UnsupportedOperationException()
}
class MockKeyManagementService(val identityService: IdentityService,