Remove getCashBalances first part

More removal of getCashBalances

Get rid of duplicated code

Move onto new getCashBalance extension methods

Move onto new getCashBalance extension methods

Correct balance query code

Address PR request comments

Address PR request comments

Address PR request comments
This commit is contained in:
Matthew Nesbit
2017-07-12 10:00:26 +01:00
parent d6fcf2650f
commit b4ca0cdde9
19 changed files with 201 additions and 263 deletions

View File

@ -57,7 +57,6 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
import net.corda.node.services.transactions.*
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
import net.corda.node.services.vault.HibernateVaultQueryImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
@ -485,7 +484,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun makeVaultObservers() {
VaultSoftLockManager(services.vaultService, smm)
CashBalanceAsMetricsObserver(services, database)
ScheduledActivityObserver(services)
HibernateObserver(services.vaultService.rawUpdates, HibernateConfiguration(services.schemaService))
}

View File

@ -1,6 +1,5 @@
package net.corda.node.internal
import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UpgradedContract
@ -30,7 +29,6 @@ import rx.Observable
import java.io.InputStream
import java.security.PublicKey
import java.time.Instant
import java.util.*
/**
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
@ -111,12 +109,6 @@ class CordaRPCOpsImpl(
}
}
override fun getCashBalances(): Map<Currency, Amount<Currency>> {
return database.transaction {
services.vaultService.cashBalances
}
}
override fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
val stateMachine = startFlow(logicType, args)
return FlowProgressHandleImpl(

View File

@ -1,44 +0,0 @@
package net.corda.node.services.vault
import com.codahale.metrics.Gauge
import net.corda.core.node.services.VaultService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.CordaPersistence
import java.util.*
/**
* This class observes the vault and reflect current cash balances as exposed metrics in the monitoring service.
*/
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal, val database: CordaPersistence) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
serviceHubInternal.vaultService.updates.subscribe { _ ->
exportCashBalancesViaMetrics(serviceHubInternal.vaultService)
}
}
private class BalanceMetric : Gauge<Long> {
@Volatile var pennies = 0L
override fun getValue(): Long? = pennies
}
private val balanceMetrics = HashMap<Currency, BalanceMetric>()
private fun exportCashBalancesViaMetrics(vault: VaultService) {
// This is just for demo purposes. We probably shouldn't expose balances via JMX in a real node as that might
// be commercially sensitive info that the sysadmins aren't even meant to know.
//
// Note: exported as pennies.
val m = serviceHubInternal.monitoringService.metrics
database.transaction {
for ((key, value) in vault.cashBalances) {
val metric = balanceMetrics.getOrPut(key) {
val newMetric = BalanceMetric()
m.register("VaultBalances.${key}Pennies", newMetric)
newMetric
}
metric.pennies = value.quantity
}
}
}
}

View File

@ -36,8 +36,10 @@ import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.*
import net.corda.node.services.database.RequeryConfiguration
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.vault.schemas.requery.*
import net.corda.node.services.vault.schemas.requery.Models
import net.corda.node.services.vault.schemas.requery.VaultSchema
import net.corda.node.services.vault.schemas.requery.VaultStatesEntity
import net.corda.node.services.vault.schemas.requery.VaultTxnNoteEntity
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import rx.Observable
@ -124,51 +126,6 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
return update
}
// TODO: consider moving this logic outside the vault
// TODO: revisit the concurrency safety of this logic when we move beyond single threaded SMM.
// For example, we update currency totals in a non-deterministic order and so expose ourselves to deadlock.
private fun maybeUpdateCashBalances(update: Vault.Update) {
if (update.containsType<Cash.State>()) {
val consumed = sumCashStates(update.consumed)
val produced = sumCashStates(update.produced)
(produced.keys + consumed.keys).map { currency ->
val producedAmount = produced[currency] ?: Amount(0, currency)
val consumedAmount = consumed[currency] ?: Amount(0, currency)
val cashBalanceEntity = VaultCashBalancesEntity()
cashBalanceEntity.currency = currency.currencyCode
cashBalanceEntity.amount = producedAmount.quantity - consumedAmount.quantity
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
val state = findByKey(VaultCashBalancesEntity::class, currency.currencyCode)
state?.run {
amount += producedAmount.quantity - consumedAmount.quantity
}
upsert(state ?: cashBalanceEntity)
val total = state?.amount ?: cashBalanceEntity.amount
log.trace { "Updating Cash balance for $currency by ${cashBalanceEntity.amount} pennies (total: $total)" }
}
}
}
}
@Suppress("UNCHECKED_CAST")
private fun sumCashStates(states: Iterable<StateAndRef<ContractState>>): Map<Currency, Amount<Currency>> {
return states.mapNotNull { (it.state.data as? FungibleAsset<Currency>)?.amount }
.groupBy { it.token.product }
.mapValues { it.value.map { Amount(it.quantity, it.token.product) }.sumOrThrow() }
}
override val cashBalances: Map<Currency, Amount<Currency>> get() {
val cashBalancesByCurrency =
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
val balances = select(VaultSchema.VaultCashBalances::class)
balances.get().toList()
}
return cashBalancesByCurrency.associateBy({ Currency.getInstance(it.currency) },
{ Amount(it.amount, Currency.getInstance(it.currency)) })
}
override val rawUpdates: Observable<Vault.Update>
get() = mutex.locked { _rawUpdatesPublisher }
@ -232,7 +189,6 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, ourKeys) }
if (netDelta != Vault.NoUpdate) {
recordUpdate(netDelta)
maybeUpdateCashBalances(netDelta)
mutex.locked {
// flowId required by SoftLockManager to perform auto-registration of soft locks for new states
val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid

View File

@ -2,24 +2,26 @@ package net.corda.node.services.vault
import net.corda.contracts.asset.Cash
import net.corda.contracts.asset.DUMMY_CASH_ISSUER
import net.corda.contracts.getCashBalance
import net.corda.core.contracts.*
import net.corda.core.crypto.generateKeyPair
import net.corda.core.identity.AnonymousParty
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.unconsumedStates
import net.corda.core.node.services.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.toNonEmptySet
import net.corda.node.services.database.HibernateConfiguration
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.schemas.CommercialPaperSchemaV1
import net.corda.testing.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.schemas.DummyLinearStateSchemaV1
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
@ -31,7 +33,6 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue
class NodeVaultServiceTest : TestDependencyInjectionBase() {
@ -45,8 +46,10 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps)
database.transaction {
val customSchemas = setOf(CommercialPaperSchemaV1, DummyLinearStateSchemaV1)
val hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas))
services = object : MockServices() {
override val vaultService: VaultService = makeVaultService(dataSourceProps)
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
@ -55,6 +58,8 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
}
override val vaultQueryService: VaultQueryService = HibernateVaultQueryImpl(hibernateConfig, vaultService.updatesPublisher)
}
}
}
@ -155,7 +160,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val vaultStates =
database.transaction {
assertNull(vaultSvc.cashBalances[USD])
assertEquals(0.DOLLARS, services.getCashBalance(USD))
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = (vaultStates.states.map { it.ref }).toNonEmptySet()
@ -211,7 +216,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val vaultStates =
database.transaction {
assertNull(vaultSvc.cashBalances[USD])
assertEquals(0.DOLLARS, services.getCashBalance(USD))
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = vaultStates.states.map { it.ref }
@ -238,7 +243,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val vaultStates =
database.transaction {
assertNull(vaultSvc.cashBalances[USD])
assertEquals(0.DOLLARS, services.getCashBalance(USD))
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = (vaultStates.states.map { it.ref }).toNonEmptySet()
@ -264,7 +269,7 @@ class NodeVaultServiceTest : TestDependencyInjectionBase() {
val vaultStates =
database.transaction {
assertNull(vaultSvc.cashBalances[USD])
assertEquals(0.DOLLARS, services.getCashBalance(USD))
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
}
val stateRefsToSoftLock = vaultStates.states.map { it.ref }

View File

@ -2,18 +2,24 @@ package net.corda.node.services.vault
import net.corda.contracts.asset.Cash
import net.corda.contracts.asset.DUMMY_CASH_ISSUER
import net.corda.contracts.getCashBalance
import net.corda.core.contracts.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.node.services.VaultQueryService
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.consumedStates
import net.corda.core.node.services.unconsumedStates
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.database.HibernateConfiguration
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.schemas.CommercialPaperSchemaV1
import net.corda.testing.*
import net.corda.testing.contracts.*
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.schemas.DummyLinearStateSchemaV1
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
@ -23,7 +29,6 @@ import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertNull
// TODO: Move this to the cash contract tests once mock services are further split up.
@ -39,8 +44,10 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps)
database.transaction {
val customSchemas = setOf(CommercialPaperSchemaV1, DummyLinearStateSchemaV1)
val hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas))
services = object : MockServices() {
override val vaultService: VaultService = makeVaultService(dataSourceProps)
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
@ -49,6 +56,8 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
}
override val vaultQueryService: VaultQueryService = HibernateVaultQueryImpl(hibernateConfig, vaultService.updatesPublisher)
}
}
}
@ -88,7 +97,7 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
Cash().generateIssue(usefulBuilder, 100.DOLLARS `issued by` MEGA_CORP.ref(1), AnonymousParty(freshKey), DUMMY_NOTARY)
val usefulTX = megaCorpServices.signInitialTransaction(usefulBuilder)
assertNull(vault.cashBalances[USD])
assertEquals(0.DOLLARS, services.getCashBalance(USD))
services.recordTransactions(usefulTX)
// A tx that spends our money.
@ -97,7 +106,7 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
val spendPTX = services.signInitialTransaction(spendTXBuilder, freshKey)
val spendTX = notaryServices.addSignature(spendPTX)
assertEquals(100.DOLLARS, vault.cashBalances[USD])
assertEquals(100.DOLLARS, services.getCashBalance(USD))
// A tx that doesn't send us anything.
val irrelevantBuilder = TransactionType.General.Builder(DUMMY_NOTARY)
@ -107,10 +116,10 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
val irrelevantTX = notaryServices.addSignature(irrelevantPTX)
services.recordTransactions(irrelevantTX)
assertEquals(100.DOLLARS, vault.cashBalances[USD])
assertEquals(100.DOLLARS, services.getCashBalance(USD))
services.recordTransactions(spendTX)
assertEquals(20.DOLLARS, vault.cashBalances[USD])
assertEquals(20.DOLLARS, services.getCashBalance(USD))
// TODO: Flesh out these tests as needed.
}
@ -126,7 +135,7 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
issuedBy = MEGA_CORP.ref(1),
issuerKey = MEGA_CORP_KEY,
ownedBy = AnonymousParty(freshKey))
println("Cash balance: ${vault.cashBalances[USD]}")
println("Cash balance: ${services.getCashBalance(USD)}")
assertThat(vault.unconsumedStates<Cash.State>()).hasSize(10)
assertThat(vault.softLockedStates<Cash.State>()).hasSize(0)
@ -149,7 +158,7 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
LOCKED: ${vault.softLockedStates<Cash.State>().count()} : ${vault.softLockedStates<Cash.State>()}
""")
services.recordTransactions(txn1)
println("txn1: Cash balance: ${vault.cashBalances[USD]}")
println("txn1: Cash balance: ${services.getCashBalance(USD)}")
println("""txn1 states:
UNCONSUMED: ${vault.unconsumedStates<Cash.State>().count()} : ${vault.unconsumedStates<Cash.State>()},
CONSUMED: ${vault.consumedStates<Cash.State>().count()} : ${vault.consumedStates<Cash.State>()},
@ -179,7 +188,7 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
LOCKED: ${vault.softLockedStates<Cash.State>().count()} : ${vault.softLockedStates<Cash.State>()}
""")
services.recordTransactions(txn2)
println("txn2: Cash balance: ${vault.cashBalances[USD]}")
println("txn2: Cash balance: ${services.getCashBalance(USD)}")
println("""txn2 states:
UNCONSUMED: ${vault.unconsumedStates<Cash.State>().count()} : ${vault.unconsumedStates<Cash.State>()},
CONSUMED: ${vault.consumedStates<Cash.State>().count()} : ${vault.consumedStates<Cash.State>()},
@ -197,8 +206,8 @@ class VaultWithCashTest : TestDependencyInjectionBase() {
countDown.await()
database.transaction {
println("Cash balance: ${vault.cashBalances[USD]}")
assertThat(vault.cashBalances[USD]).isIn(DOLLARS(20), DOLLARS(40))
println("Cash balance: ${services.getCashBalance(USD)}")
assertThat(services.getCashBalance(USD)).isIn(DOLLARS(20), DOLLARS(40))
}
}