mirror of
https://github.com/corda/corda.git
synced 2025-06-22 09:08:49 +00:00
Pull out Wallet observable into it's own branch
Review feedback Review feedback Apply feedback from previous PR Apply feedback from previous PR Apply feedback from previous PR PR feedback PR feedback PR feedback
This commit is contained in:
@ -34,6 +34,7 @@ import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
||||
import com.r3corda.node.services.transactions.NotaryService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.services.transactions.ValidatingNotaryService
|
||||
import com.r3corda.node.services.wallet.CashBalanceAsMetricsObserver
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.utilities.ANSIProgressObserver
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
@ -140,6 +141,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
// TODO: this model might change but for now it provides some de-coupling
|
||||
// Add SMM observers
|
||||
ANSIProgressObserver(smm)
|
||||
// Add wallet observers
|
||||
CashBalanceAsMetricsObserver(services)
|
||||
|
||||
startMessagingService()
|
||||
networkMapRegistrationFuture = registerWithNetworkMap()
|
||||
|
@ -9,7 +9,6 @@ import com.r3corda.core.days
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.seconds
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol
|
||||
import java.time.Instant
|
||||
|
||||
@ -27,7 +26,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
|
||||
val buyer = banks[buyerBankIndex]
|
||||
val seller = banks[sellerBankIndex]
|
||||
|
||||
(buyer.services.walletService as NodeWalletService).fillWithSomeTestCash(notary.info.identity, 1500.DOLLARS)
|
||||
WalletFiller.fillWithSomeTestCash(buyer.services, notary.info.identity, 1500.DOLLARS)
|
||||
|
||||
val issuance = run {
|
||||
val tx = CommercialPaper().generateIssue(seller.info.identity.ref(1, 2, 3), 1100.DOLLARS, Instant.now() + 10.days, notary.info.identity)
|
||||
|
@ -0,0 +1,62 @@
|
||||
package com.r3corda.node.internal.testing
|
||||
|
||||
import com.r3corda.contracts.cash.Cash
|
||||
import com.r3corda.core.contracts.Amount
|
||||
import com.r3corda.core.contracts.TransactionBuilder
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import java.util.*
|
||||
|
||||
object WalletFiller {
|
||||
|
||||
/**
|
||||
* Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them
|
||||
* to the wallet.
|
||||
*
|
||||
* The cash is self issued with the current nodes identity, as fetched from the storage service. Thus it
|
||||
* would not be trusted by any sensible market participant and is effectively an IOU. If it had been issued by
|
||||
* the central bank, well ... that'd be a different story altogether.
|
||||
*/
|
||||
fun fillWithSomeTestCash(services: ServiceHub, notary: Party, howMuch: Amount<Currency>, atLeastThisManyStates: Int = 3,
|
||||
atMostThisManyStates: Int = 10, rng: Random = Random()) {
|
||||
val amounts = calculateRandomlySizedAmounts(howMuch, atLeastThisManyStates, atMostThisManyStates, rng)
|
||||
|
||||
val myIdentity = services.storageService.myLegalIdentity
|
||||
val myKey = services.storageService.myLegalIdentityKey
|
||||
|
||||
// We will allocate one state to one transaction, for simplicities sake.
|
||||
val cash = Cash()
|
||||
val transactions = amounts.map { pennies ->
|
||||
// This line is what makes the cash self issued. We just use zero as our deposit reference: we don't need
|
||||
// this field as there's no other database or source of truth we need to sync with.
|
||||
val depositRef = myIdentity.ref(0)
|
||||
|
||||
val issuance = TransactionBuilder()
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
cash.generateIssue(issuance, Amount(pennies, howMuch.token), depositRef, freshKey.public, notary)
|
||||
issuance.signWith(myKey)
|
||||
|
||||
return@map issuance.toSignedTransaction(true)
|
||||
}
|
||||
|
||||
services.recordTransactions(transactions)
|
||||
}
|
||||
|
||||
private fun calculateRandomlySizedAmounts(howMuch: Amount<Currency>, min: Int, max: Int, rng: Random): LongArray {
|
||||
val numStates = min + Math.floor(rng.nextDouble() * (max - min)).toInt()
|
||||
val amounts = LongArray(numStates)
|
||||
val baseSize = howMuch.quantity / numStates
|
||||
var filledSoFar = 0L
|
||||
for (i in 0..numStates - 1) {
|
||||
if (i < numStates - 1) {
|
||||
// Adjust the amount a bit up or down, to give more realistic amounts (not all identical).
|
||||
amounts[i] = baseSize + (baseSize / 2 * (rng.nextDouble() - 0.5)).toLong()
|
||||
filledSoFar += baseSize
|
||||
} else {
|
||||
// Handle inexact rounding.
|
||||
amounts[i] = howMuch.quantity - filledSoFar
|
||||
}
|
||||
}
|
||||
return amounts
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package com.r3corda.node.services.wallet
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.r3corda.core.node.services.Wallet
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* This class observes the wallet and reflect current cash balances as exposed metrics in the monitoring service.
|
||||
*/
|
||||
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal) {
|
||||
|
||||
init {
|
||||
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
|
||||
serviceHubInternal.walletService.updates.subscribe { update ->
|
||||
exportCashBalancesViaMetrics(serviceHubInternal.walletService.currentWallet)
|
||||
}
|
||||
}
|
||||
|
||||
private class BalanceMetric : Gauge<Long> {
|
||||
@Volatile var pennies = 0L
|
||||
override fun getValue(): Long? = pennies
|
||||
}
|
||||
|
||||
private val balanceMetrics = HashMap<Currency, BalanceMetric>()
|
||||
|
||||
private fun exportCashBalancesViaMetrics(wallet: Wallet) {
|
||||
// This is just for demo purposes. We probably shouldn't expose balances via JMX in a real node as that might
|
||||
// be commercially sensitive info that the sysadmins aren't even meant to know.
|
||||
//
|
||||
// Note: exported as pennies.
|
||||
val m = serviceHubInternal.monitoringService.metrics
|
||||
for (balance in wallet.cashBalances) {
|
||||
val metric = balanceMetrics.getOrPut(balance.key) {
|
||||
val newMetric = BalanceMetric()
|
||||
m.register("WalletBalances.${balance.key}Pennies", newMetric)
|
||||
newMetric
|
||||
}
|
||||
metric.pennies = balance.value.quantity
|
||||
}
|
||||
}
|
||||
}
|
@ -1,10 +1,7 @@
|
||||
package com.r3corda.node.services.wallet
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.r3corda.contracts.cash.Cash
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.services.Wallet
|
||||
import com.r3corda.core.node.services.WalletService
|
||||
@ -12,6 +9,8 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -36,6 +35,11 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
|
||||
|
||||
override val currentWallet: Wallet get() = mutex.locked { wallet }
|
||||
|
||||
private val _updatesPublisher = PublishSubject.create<Wallet.Update>()
|
||||
|
||||
override val updates: Observable<Wallet.Update>
|
||||
get() = _updatesPublisher
|
||||
|
||||
/**
|
||||
* Returns a snapshot of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
|
||||
* which we have no cash evaluate to null, not 0.
|
||||
@ -66,14 +70,24 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
|
||||
//
|
||||
// ... and many other things .... (Wallet.java in bitcoinj is several thousand lines long)
|
||||
|
||||
mutex.locked {
|
||||
var netDelta = Wallet.NoUpdate
|
||||
val changedWallet = mutex.locked {
|
||||
// Starting from the current wallet, keep applying the transaction updates, calculating a new Wallet each
|
||||
// time, until we get to the result (this is perhaps a bit inefficient, but it's functional and easily
|
||||
// unit tested).
|
||||
wallet = txns.fold(currentWallet) { current, tx -> current.update(tx, ourKeys) }
|
||||
exportCashBalancesViaMetrics(wallet)
|
||||
return wallet
|
||||
val walletAndNetDelta = txns.fold(Pair(currentWallet, Wallet.NoUpdate)) { walletAndDelta, tx ->
|
||||
val (wallet, delta) = walletAndDelta.first.update(tx, ourKeys)
|
||||
val combinedDelta = delta + walletAndDelta.second
|
||||
Pair(wallet, combinedDelta)
|
||||
}
|
||||
wallet = walletAndNetDelta.first
|
||||
netDelta = walletAndNetDelta.second
|
||||
return@locked wallet
|
||||
}
|
||||
if (netDelta != Wallet.NoUpdate) {
|
||||
_updatesPublisher.onNext(netDelta)
|
||||
}
|
||||
return changedWallet
|
||||
}
|
||||
|
||||
private fun isRelevant(state: ContractState, ourKeys: Set<PublicKey>): Boolean {
|
||||
@ -87,7 +101,7 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
|
||||
}
|
||||
}
|
||||
|
||||
private fun Wallet.update(tx: WireTransaction, ourKeys: Set<PublicKey>): Wallet {
|
||||
private fun Wallet.update(tx: WireTransaction, ourKeys: Set<PublicKey>): Pair<Wallet, Wallet.Update> {
|
||||
val ourNewStates = tx.outputs.
|
||||
filter { isRelevant(it, ourKeys) }.
|
||||
map { tx.outRef<ContractState>(it) }
|
||||
@ -98,9 +112,11 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
|
||||
// Is transaction irrelevant?
|
||||
if (consumed.isEmpty() && ourNewStates.isEmpty()) {
|
||||
log.trace { "tx ${tx.id} was irrelevant to this wallet, ignoring" }
|
||||
return this
|
||||
return Pair(this, Wallet.NoUpdate)
|
||||
}
|
||||
|
||||
val change = Wallet.Update(consumed, HashSet(ourNewStates))
|
||||
|
||||
// And calculate the new wallet.
|
||||
val newStates = states.filter { it.ref !in consumed } + ourNewStates
|
||||
|
||||
@ -108,82 +124,6 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
|
||||
"Applied tx ${tx.id.prefixChars()} to the wallet: consumed ${consumed.size} states and added ${newStates.size}"
|
||||
}
|
||||
|
||||
return WalletImpl(newStates)
|
||||
}
|
||||
|
||||
private class BalanceMetric : Gauge<Long> {
|
||||
@Volatile var pennies = 0L
|
||||
override fun getValue(): Long? = pennies
|
||||
}
|
||||
|
||||
private val balanceMetrics = HashMap<Currency, BalanceMetric>()
|
||||
|
||||
private fun exportCashBalancesViaMetrics(wallet: Wallet) {
|
||||
// This is just for demo purposes. We probably shouldn't expose balances via JMX in a real node as that might
|
||||
// be commercially sensitive info that the sysadmins aren't even meant to know.
|
||||
//
|
||||
// Note: exported as pennies.
|
||||
val m = services.monitoringService.metrics
|
||||
for (balance in wallet.cashBalances) {
|
||||
val metric = balanceMetrics.getOrPut(balance.key) {
|
||||
val newMetric = BalanceMetric()
|
||||
m.register("WalletBalances.${balance.key}Pennies", newMetric)
|
||||
newMetric
|
||||
}
|
||||
metric.pennies = balance.value.quantity
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them
|
||||
* to the wallet.
|
||||
*
|
||||
* The cash is self issued with the current nodes identity, as fetched from the storage service. Thus it
|
||||
* would not be trusted by any sensible market participant and is effectively an IOU. If it had been issued by
|
||||
* the central bank, well ... that'd be a different story altogether.
|
||||
*
|
||||
* TODO: Move this out of NodeWalletService
|
||||
*/
|
||||
fun fillWithSomeTestCash(notary: Party, howMuch: Amount<Currency>, atLeastThisManyStates: Int = 3,
|
||||
atMostThisManyStates: Int = 10, rng: Random = Random()) {
|
||||
val amounts = calculateRandomlySizedAmounts(howMuch, atLeastThisManyStates, atMostThisManyStates, rng)
|
||||
|
||||
val myIdentity = services.storageService.myLegalIdentity
|
||||
val myKey = services.storageService.myLegalIdentityKey
|
||||
|
||||
// We will allocate one state to one transaction, for simplicities sake.
|
||||
val cash = Cash()
|
||||
val transactions = amounts.map { pennies ->
|
||||
// This line is what makes the cash self issued. We just use zero as our deposit reference: we don't need
|
||||
// this field as there's no other database or source of truth we need to sync with.
|
||||
val depositRef = myIdentity.ref(0)
|
||||
|
||||
val issuance = TransactionBuilder()
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
cash.generateIssue(issuance, Amount(pennies, howMuch.token), depositRef, freshKey.public, notary)
|
||||
issuance.signWith(myKey)
|
||||
|
||||
return@map issuance.toSignedTransaction(true)
|
||||
}
|
||||
|
||||
services.recordTransactions(transactions)
|
||||
}
|
||||
|
||||
private fun calculateRandomlySizedAmounts(howMuch: Amount<Currency>, min: Int, max: Int, rng: Random): LongArray {
|
||||
val numStates = min + Math.floor(rng.nextDouble() * (max - min)).toInt()
|
||||
val amounts = LongArray(numStates)
|
||||
val baseSize = howMuch.quantity / numStates
|
||||
var filledSoFar = 0L
|
||||
for (i in 0..numStates - 1) {
|
||||
if (i < numStates - 1) {
|
||||
// Adjust the amount a bit up or down, to give more realistic amounts (not all identical).
|
||||
amounts[i] = baseSize + (baseSize / 2 * (rng.nextDouble() - 0.5)).toLong()
|
||||
filledSoFar += baseSize
|
||||
} else {
|
||||
// Handle inexact rounding.
|
||||
amounts[i] = howMuch.quantity - filledSoFar
|
||||
}
|
||||
}
|
||||
return amounts
|
||||
return Pair(WalletImpl(newStates), change)
|
||||
}
|
||||
}
|
||||
|
@ -21,13 +21,13 @@ import com.r3corda.core.seconds
|
||||
import com.r3corda.core.testing.*
|
||||
import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.node.internal.testing.WalletFiller
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.network.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
import com.r3corda.node.services.persistence.PerFileTransactionStorage
|
||||
import com.r3corda.node.services.persistence.StorageServiceImpl
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import com.r3corda.node.services.wallet.WalletImpl
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
@ -93,7 +93,7 @@ class TwoPartyTradeProtocolTests {
|
||||
val aliceNode = net.createPartyNode(notaryNode.info, ALICE.name, ALICE_KEY)
|
||||
val bobNode = net.createPartyNode(notaryNode.info, BOB.name, BOB_KEY)
|
||||
|
||||
(bobNode.wallet as NodeWalletService).fillWithSomeTestCash(DUMMY_NOTARY, 2000.DOLLARS)
|
||||
WalletFiller.fillWithSomeTestCash(bobNode.services, DUMMY_NOTARY, 2000.DOLLARS)
|
||||
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
|
||||
notaryNode.info.identity, null).second
|
||||
|
||||
@ -144,7 +144,7 @@ class TwoPartyTradeProtocolTests {
|
||||
|
||||
net.runNetwork() // Clear network map registration messages
|
||||
|
||||
(bobNode.wallet as NodeWalletService).fillWithSomeTestCash(DUMMY_NOTARY, 2000.DOLLARS)
|
||||
WalletFiller.fillWithSomeTestCash(bobNode.services, DUMMY_NOTARY, 2000.DOLLARS)
|
||||
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
|
||||
notaryNode.info.identity, null).second
|
||||
insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)
|
||||
|
@ -10,6 +10,7 @@ import com.r3corda.core.node.services.testing.MockKeyManagementService
|
||||
import com.r3corda.core.node.services.testing.MockStorageService
|
||||
import com.r3corda.core.testing.*
|
||||
import com.r3corda.core.utilities.BriefLogFormatter
|
||||
import com.r3corda.node.internal.testing.WalletFiller
|
||||
import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
@ -42,7 +43,7 @@ class NodeWalletServiceTest {
|
||||
|
||||
kms.nextKeys += Array(3) { ALICE_KEY }
|
||||
// Fix the PRNG so that we get the same splits every time.
|
||||
wallet.fillWithSomeTestCash(DUMMY_NOTARY, 100.DOLLARS, 3, 3, Random(0L))
|
||||
WalletFiller.fillWithSomeTestCash(services, DUMMY_NOTARY, 100.DOLLARS, 3, 3, Random(0L))
|
||||
|
||||
val w = wallet.currentWallet
|
||||
assertEquals(3, w.states.size)
|
||||
|
Reference in New Issue
Block a user