Merged in cor-133-observable-wallet (pull request #135)

Pull out Wallet observable into it's own branch
This commit is contained in:
Rick Parker 2016-06-10 17:18:46 +01:00
commit 58d5162782
10 changed files with 262 additions and 95 deletions

View File

@ -3,7 +3,6 @@ package com.r3corda.core.node.services
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.TransactionStorage
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -38,6 +37,37 @@ abstract class Wallet {
* which we have no cash evaluate to null (not present in map), not 0.
*/
abstract val cashBalances: Map<Currency, Amount<Currency>>
/**
* Represents an update observed by the Wallet that will be notified to observers. Include the [StateRef]s of
* transaction outputs that were consumed (inputs) and the [ContractState]s produced (outputs) to/by the transaction
* or transactions observed and the Wallet.
*
* If the Wallet observes multiple transactions simultaneously, where some transactions consume the outputs of some of the
* other transactions observed, then the changes are observed "net" of those.
*/
data class Update(val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) {
/**
* Combine two updates into a single update with the combined inputs and outputs of the two updates but net
* any outputs of the left-hand-side (this) that are consumed by the inputs of the right-hand-side (rhs).
*
* i.e. the net effect in terms of state live-ness of receiving the combined update is the same as receiving this followed by rhs.
*/
operator fun plus(rhs: Update): Update {
val previouslyProduced = produced.map { it.ref }
val previouslyConsumed = consumed
val combined = Wallet.Update(
previouslyConsumed + (rhs.consumed - previouslyProduced),
rhs.produced + produced.filter { it.ref !in rhs.consumed })
return combined
}
}
companion object {
val NoUpdate = Update(emptySet(), emptySet())
}
}
/**
@ -89,6 +119,12 @@ interface WalletService {
/** Same as notifyAll but with a single transaction. */
fun notify(tx: WireTransaction): Wallet = notifyAll(listOf(tx))
/**
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the Wallet will already incorporate
* the update.
*/
val updates: rx.Observable<Wallet.Update>
}
inline fun <reified T : LinearState> WalletService.linearHeadsOfType() = linearHeadsOfType_(T::class.java)

View File

@ -0,0 +1,83 @@
package com.r3corda.core.node
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.Wallet
import com.r3corda.core.testing.DUMMY_NOTARY
import org.junit.Test
import kotlin.test.assertEquals
class WalletUpdateTests {
object DummyContract : Contract {
override fun verify(tx: TransactionForVerification) {
}
override val legalContractReference: SecureHash = SecureHash.sha256("")
}
private class DummyState : ContractState {
override val notary = DUMMY_NOTARY
override val contract = WalletUpdateTests.DummyContract
}
private val stateRef0 = StateRef(SecureHash.randomSHA256(), 0)
private val stateRef1 = StateRef(SecureHash.randomSHA256(), 1)
private val stateRef2 = StateRef(SecureHash.randomSHA256(), 2)
private val stateRef3 = StateRef(SecureHash.randomSHA256(), 3)
private val stateRef4 = StateRef(SecureHash.randomSHA256(), 4)
private val stateAndRef0 = StateAndRef<DummyState>(DummyState(), stateRef0)
private val stateAndRef1 = StateAndRef<DummyState>(DummyState(), stateRef1)
private val stateAndRef2 = StateAndRef<DummyState>(DummyState(), stateRef2)
private val stateAndRef3 = StateAndRef<DummyState>(DummyState(), stateRef3)
private val stateAndRef4 = StateAndRef<DummyState>(DummyState(), stateRef4)
@Test
fun `nothing plus nothing is nothing`() {
val before = Wallet.NoUpdate
val after = before + Wallet.NoUpdate
assertEquals(before, after)
}
@Test
fun `something plus nothing is something`() {
val before = Wallet.Update(setOf(stateRef0, stateRef1), setOf(stateAndRef2, stateAndRef3))
val after = before + Wallet.NoUpdate
assertEquals(before, after)
}
@Test
fun `nothing plus something is something`() {
val before = Wallet.NoUpdate
val after = before + Wallet.Update(setOf(stateRef0, stateRef1), setOf(stateAndRef2, stateAndRef3))
val expected = Wallet.Update(setOf(stateRef0, stateRef1), setOf(stateAndRef2, stateAndRef3))
assertEquals(expected, after)
}
@Test
fun `something plus consume state 0 is something without state 0 output`() {
val before = Wallet.Update(setOf(stateRef2, stateRef3), setOf(stateAndRef0, stateAndRef1))
val after = before + Wallet.Update(setOf(stateRef0), setOf())
val expected = Wallet.Update(setOf(stateRef2, stateRef3), setOf(stateAndRef1))
assertEquals(expected, after)
}
@Test
fun `something plus produce state 4 is something with additional state 4 output`() {
val before = Wallet.Update(setOf(stateRef2, stateRef3), setOf(stateAndRef0, stateAndRef1))
val after = before + Wallet.Update(setOf(), setOf(stateAndRef4))
val expected = Wallet.Update(setOf(stateRef2, stateRef3), setOf(stateAndRef0, stateAndRef1, stateAndRef4))
assertEquals(expected, after)
}
@Test
fun `something plus consume states 0 and 1, and produce state 4, is something without state 0 and 1 outputs and only state 4 output`() {
val before = Wallet.Update(setOf(stateRef2, stateRef3), setOf(stateAndRef0, stateAndRef1))
val after = before + Wallet.Update(setOf(stateRef0, stateRef1), setOf(stateAndRef4))
val expected = Wallet.Update(setOf(stateRef2, stateRef3), setOf(stateAndRef4))
assertEquals(expected, after)
}
}

View File

@ -34,6 +34,7 @@ import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.services.transactions.ValidatingNotaryService
import com.r3corda.node.services.wallet.CashBalanceAsMetricsObserver
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.utilities.ANSIProgressObserver
import com.r3corda.node.utilities.AddOrRemove
@ -140,6 +141,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// TODO: this model might change but for now it provides some de-coupling
// Add SMM observers
ANSIProgressObserver(smm)
// Add wallet observers
CashBalanceAsMetricsObserver(services)
startMessagingService()
networkMapRegistrationFuture = registerWithNetworkMap()

View File

@ -9,7 +9,6 @@ import com.r3corda.core.days
import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.protocols.TwoPartyTradeProtocol
import java.time.Instant
@ -27,7 +26,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
val buyer = banks[buyerBankIndex]
val seller = banks[sellerBankIndex]
(buyer.services.walletService as NodeWalletService).fillWithSomeTestCash(notary.info.identity, 1500.DOLLARS)
WalletFiller.fillWithSomeTestCash(buyer.services, notary.info.identity, 1500.DOLLARS)
val issuance = run {
val tx = CommercialPaper().generateIssue(seller.info.identity.ref(1, 2, 3), 1100.DOLLARS, Instant.now() + 10.days, notary.info.identity)

View File

@ -0,0 +1,62 @@
package com.r3corda.node.internal.testing
import com.r3corda.contracts.cash.Cash
import com.r3corda.core.contracts.Amount
import com.r3corda.core.contracts.TransactionBuilder
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.ServiceHub
import java.util.*
object WalletFiller {
/**
* Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them
* to the wallet.
*
* The cash is self issued with the current nodes identity, as fetched from the storage service. Thus it
* would not be trusted by any sensible market participant and is effectively an IOU. If it had been issued by
* the central bank, well ... that'd be a different story altogether.
*/
fun fillWithSomeTestCash(services: ServiceHub, notary: Party, howMuch: Amount<Currency>, atLeastThisManyStates: Int = 3,
atMostThisManyStates: Int = 10, rng: Random = Random()) {
val amounts = calculateRandomlySizedAmounts(howMuch, atLeastThisManyStates, atMostThisManyStates, rng)
val myIdentity = services.storageService.myLegalIdentity
val myKey = services.storageService.myLegalIdentityKey
// We will allocate one state to one transaction, for simplicities sake.
val cash = Cash()
val transactions = amounts.map { pennies ->
// This line is what makes the cash self issued. We just use zero as our deposit reference: we don't need
// this field as there's no other database or source of truth we need to sync with.
val depositRef = myIdentity.ref(0)
val issuance = TransactionBuilder()
val freshKey = services.keyManagementService.freshKey()
cash.generateIssue(issuance, Amount(pennies, howMuch.token), depositRef, freshKey.public, notary)
issuance.signWith(myKey)
return@map issuance.toSignedTransaction(true)
}
services.recordTransactions(transactions)
}
private fun calculateRandomlySizedAmounts(howMuch: Amount<Currency>, min: Int, max: Int, rng: Random): LongArray {
val numStates = min + Math.floor(rng.nextDouble() * (max - min)).toInt()
val amounts = LongArray(numStates)
val baseSize = howMuch.quantity / numStates
var filledSoFar = 0L
for (i in 0..numStates - 1) {
if (i < numStates - 1) {
// Adjust the amount a bit up or down, to give more realistic amounts (not all identical).
amounts[i] = baseSize + (baseSize / 2 * (rng.nextDouble() - 0.5)).toLong()
filledSoFar += baseSize
} else {
// Handle inexact rounding.
amounts[i] = howMuch.quantity - filledSoFar
}
}
return amounts
}
}

View File

@ -0,0 +1,42 @@
package com.r3corda.node.services.wallet
import com.codahale.metrics.Gauge
import com.r3corda.core.node.services.Wallet
import com.r3corda.node.services.api.ServiceHubInternal
import java.util.*
/**
* This class observes the wallet and reflect current cash balances as exposed metrics in the monitoring service.
*/
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
serviceHubInternal.walletService.updates.subscribe { update ->
exportCashBalancesViaMetrics(serviceHubInternal.walletService.currentWallet)
}
}
private class BalanceMetric : Gauge<Long> {
@Volatile var pennies = 0L
override fun getValue(): Long? = pennies
}
private val balanceMetrics = HashMap<Currency, BalanceMetric>()
private fun exportCashBalancesViaMetrics(wallet: Wallet) {
// This is just for demo purposes. We probably shouldn't expose balances via JMX in a real node as that might
// be commercially sensitive info that the sysadmins aren't even meant to know.
//
// Note: exported as pennies.
val m = serviceHubInternal.monitoringService.metrics
for (balance in wallet.cashBalances) {
val metric = balanceMetrics.getOrPut(balance.key) {
val newMetric = BalanceMetric()
m.register("WalletBalances.${balance.key}Pennies", newMetric)
newMetric
}
metric.pennies = balance.value.quantity
}
}
}

View File

@ -1,10 +1,7 @@
package com.r3corda.node.services.wallet
import com.codahale.metrics.Gauge
import com.r3corda.contracts.cash.Cash
import com.r3corda.core.ThreadBox
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.Wallet
import com.r3corda.core.node.services.WalletService
@ -12,6 +9,8 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.ServiceHubInternal
import rx.Observable
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.util.*
import javax.annotation.concurrent.ThreadSafe
@ -36,6 +35,11 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
override val currentWallet: Wallet get() = mutex.locked { wallet }
private val _updatesPublisher = PublishSubject.create<Wallet.Update>()
override val updates: Observable<Wallet.Update>
get() = _updatesPublisher
/**
* Returns a snapshot of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
* which we have no cash evaluate to null, not 0.
@ -66,14 +70,24 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
//
// ... and many other things .... (Wallet.java in bitcoinj is several thousand lines long)
mutex.locked {
var netDelta = Wallet.NoUpdate
val changedWallet = mutex.locked {
// Starting from the current wallet, keep applying the transaction updates, calculating a new Wallet each
// time, until we get to the result (this is perhaps a bit inefficient, but it's functional and easily
// unit tested).
wallet = txns.fold(currentWallet) { current, tx -> current.update(tx, ourKeys) }
exportCashBalancesViaMetrics(wallet)
return wallet
val walletAndNetDelta = txns.fold(Pair(currentWallet, Wallet.NoUpdate)) { walletAndDelta, tx ->
val (wallet, delta) = walletAndDelta.first.update(tx, ourKeys)
val combinedDelta = delta + walletAndDelta.second
Pair(wallet, combinedDelta)
}
wallet = walletAndNetDelta.first
netDelta = walletAndNetDelta.second
return@locked wallet
}
if (netDelta != Wallet.NoUpdate) {
_updatesPublisher.onNext(netDelta)
}
return changedWallet
}
private fun isRelevant(state: ContractState, ourKeys: Set<PublicKey>): Boolean {
@ -87,7 +101,7 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
}
}
private fun Wallet.update(tx: WireTransaction, ourKeys: Set<PublicKey>): Wallet {
private fun Wallet.update(tx: WireTransaction, ourKeys: Set<PublicKey>): Pair<Wallet, Wallet.Update> {
val ourNewStates = tx.outputs.
filter { isRelevant(it, ourKeys) }.
map { tx.outRef<ContractState>(it) }
@ -98,9 +112,11 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
// Is transaction irrelevant?
if (consumed.isEmpty() && ourNewStates.isEmpty()) {
log.trace { "tx ${tx.id} was irrelevant to this wallet, ignoring" }
return this
return Pair(this, Wallet.NoUpdate)
}
val change = Wallet.Update(consumed, HashSet(ourNewStates))
// And calculate the new wallet.
val newStates = states.filter { it.ref !in consumed } + ourNewStates
@ -108,82 +124,6 @@ class NodeWalletService(private val services: ServiceHubInternal) : SingletonSer
"Applied tx ${tx.id.prefixChars()} to the wallet: consumed ${consumed.size} states and added ${newStates.size}"
}
return WalletImpl(newStates)
}
private class BalanceMetric : Gauge<Long> {
@Volatile var pennies = 0L
override fun getValue(): Long? = pennies
}
private val balanceMetrics = HashMap<Currency, BalanceMetric>()
private fun exportCashBalancesViaMetrics(wallet: Wallet) {
// This is just for demo purposes. We probably shouldn't expose balances via JMX in a real node as that might
// be commercially sensitive info that the sysadmins aren't even meant to know.
//
// Note: exported as pennies.
val m = services.monitoringService.metrics
for (balance in wallet.cashBalances) {
val metric = balanceMetrics.getOrPut(balance.key) {
val newMetric = BalanceMetric()
m.register("WalletBalances.${balance.key}Pennies", newMetric)
newMetric
}
metric.pennies = balance.value.quantity
}
}
/**
* Creates a random set of between (by default) 3 and 10 cash states that add up to the given amount and adds them
* to the wallet.
*
* The cash is self issued with the current nodes identity, as fetched from the storage service. Thus it
* would not be trusted by any sensible market participant and is effectively an IOU. If it had been issued by
* the central bank, well ... that'd be a different story altogether.
*
* TODO: Move this out of NodeWalletService
*/
fun fillWithSomeTestCash(notary: Party, howMuch: Amount<Currency>, atLeastThisManyStates: Int = 3,
atMostThisManyStates: Int = 10, rng: Random = Random()) {
val amounts = calculateRandomlySizedAmounts(howMuch, atLeastThisManyStates, atMostThisManyStates, rng)
val myIdentity = services.storageService.myLegalIdentity
val myKey = services.storageService.myLegalIdentityKey
// We will allocate one state to one transaction, for simplicities sake.
val cash = Cash()
val transactions = amounts.map { pennies ->
// This line is what makes the cash self issued. We just use zero as our deposit reference: we don't need
// this field as there's no other database or source of truth we need to sync with.
val depositRef = myIdentity.ref(0)
val issuance = TransactionBuilder()
val freshKey = services.keyManagementService.freshKey()
cash.generateIssue(issuance, Amount(pennies, howMuch.token), depositRef, freshKey.public, notary)
issuance.signWith(myKey)
return@map issuance.toSignedTransaction(true)
}
services.recordTransactions(transactions)
}
private fun calculateRandomlySizedAmounts(howMuch: Amount<Currency>, min: Int, max: Int, rng: Random): LongArray {
val numStates = min + Math.floor(rng.nextDouble() * (max - min)).toInt()
val amounts = LongArray(numStates)
val baseSize = howMuch.quantity / numStates
var filledSoFar = 0L
for (i in 0..numStates - 1) {
if (i < numStates - 1) {
// Adjust the amount a bit up or down, to give more realistic amounts (not all identical).
amounts[i] = baseSize + (baseSize / 2 * (rng.nextDouble() - 0.5)).toLong()
filledSoFar += baseSize
} else {
// Handle inexact rounding.
amounts[i] = howMuch.quantity - filledSoFar
}
}
return amounts
return Pair(WalletImpl(newStates), change)
}
}

View File

@ -21,13 +21,13 @@ import com.r3corda.core.seconds
import com.r3corda.core.testing.*
import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.internal.testing.WalletFiller
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.persistence.PerFileTransactionStorage
import com.r3corda.node.services.persistence.StorageServiceImpl
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.services.wallet.WalletImpl
import com.r3corda.protocols.TwoPartyTradeProtocol
import org.assertj.core.api.Assertions.assertThat
@ -93,7 +93,7 @@ class TwoPartyTradeProtocolTests {
val aliceNode = net.createPartyNode(notaryNode.info, ALICE.name, ALICE_KEY)
val bobNode = net.createPartyNode(notaryNode.info, BOB.name, BOB_KEY)
(bobNode.wallet as NodeWalletService).fillWithSomeTestCash(DUMMY_NOTARY, 2000.DOLLARS)
WalletFiller.fillWithSomeTestCash(bobNode.services, DUMMY_NOTARY, 2000.DOLLARS)
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
notaryNode.info.identity, null).second
@ -144,7 +144,7 @@ class TwoPartyTradeProtocolTests {
net.runNetwork() // Clear network map registration messages
(bobNode.wallet as NodeWalletService).fillWithSomeTestCash(DUMMY_NOTARY, 2000.DOLLARS)
WalletFiller.fillWithSomeTestCash(bobNode.services, DUMMY_NOTARY, 2000.DOLLARS)
val alicesFakePaper = fillUpForSeller(false, aliceNode.storage.myLegalIdentity.owningKey,
notaryNode.info.identity, null).second
insertFakeTransactions(alicesFakePaper, aliceNode.services, aliceNode.storage.myLegalIdentityKey)

View File

@ -10,6 +10,7 @@ import com.r3corda.core.node.services.testing.MockKeyManagementService
import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.testing.*
import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.node.internal.testing.WalletFiller
import com.r3corda.node.services.wallet.NodeWalletService
import org.junit.After
import org.junit.Before
@ -42,7 +43,7 @@ class NodeWalletServiceTest {
kms.nextKeys += Array(3) { ALICE_KEY }
// Fix the PRNG so that we get the same splits every time.
wallet.fillWithSomeTestCash(DUMMY_NOTARY, 100.DOLLARS, 3, 3, Random(0L))
WalletFiller.fillWithSomeTestCash(services, DUMMY_NOTARY, 100.DOLLARS, 3, 3, Random(0L))
val w = wallet.currentWallet
assertEquals(3, w.states.size)

View File

@ -20,12 +20,13 @@ import com.r3corda.core.utilities.BriefLogFormatter
import com.r3corda.core.utilities.Emoji
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.Node
import com.r3corda.node.internal.testing.WalletFiller
import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.node.services.wallet.NodeWalletService
import com.r3corda.node.utilities.ANSIProgressRenderer
import com.r3corda.protocols.NotaryProtocol
import com.r3corda.protocols.TwoPartyTradeProtocol
import com.typesafe.config.ConfigFactory
@ -221,7 +222,7 @@ class TraderDemoProtocolBuyer(private val attachmentsPath: Path, val notary: Par
// Self issue some cash.
//
// TODO: At some point this demo should be extended to have a central bank node.
(serviceHub.walletService as NodeWalletService).fillWithSomeTestCash(notary, 3000.DOLLARS)
WalletFiller.fillWithSomeTestCash(serviceHub, notary, 3000.DOLLARS)
while (true) {
// Wait around until a node asks to start a trade with us. In a real system, this part would happen out of band