Add observable for transactions being stored

Add observable for transactions being stored, so the UI can show transactions as they're received, rather than being
limited to the summarised version available from the wallet service.
This commit is contained in:
Ross Nicoll
2016-08-12 11:57:12 +01:00
parent 8c7de8a69c
commit 8346c58d4c
7 changed files with 50 additions and 0 deletions

View File

@ -11,6 +11,12 @@ interface ReadOnlyTransactionStorage {
* Return the transaction with the given [id], or null if no such transaction exists. * Return the transaction with the given [id], or null if no such transaction exists.
*/ */
fun getTransaction(id: SecureHash): SignedTransaction? fun getTransaction(id: SecureHash): SignedTransaction?
/**
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the Wallet will already incorporate
* the update.
*/
val updates: rx.Observable<SignedTransaction>
} }
/** /**

View File

@ -15,6 +15,8 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.testing.DUMMY_NOTARY import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.MEGA_CORP import com.r3corda.core.testing.MEGA_CORP
import com.r3corda.core.testing.MINI_CORP import com.r3corda.core.testing.MINI_CORP
import rx.Observable
import rx.subjects.PublishSubject
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.File import java.io.File
@ -116,9 +118,19 @@ class MockAttachmentStorage : AttachmentStorage {
open class MockTransactionStorage : TransactionStorage { open class MockTransactionStorage : TransactionStorage {
private val txns = HashMap<SecureHash, SignedTransaction>() private val txns = HashMap<SecureHash, SignedTransaction>()
private val _updatesPublisher = PublishSubject.create<SignedTransaction>()
override val updates: Observable<SignedTransaction>
get() = _updatesPublisher
private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
override fun addTransaction(transaction: SignedTransaction) { override fun addTransaction(transaction: SignedTransaction) {
txns[transaction.id] = transaction txns[transaction.id] = transaction
notify(transaction)
} }
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id] override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]
} }

View File

@ -12,6 +12,7 @@ import java.util.*
* Events triggered by changes in the node, and sent to monitoring client(s). * Events triggered by changes in the node, and sent to monitoring client(s).
*/ */
sealed class ServiceToClientEvent(val time: Instant) { sealed class ServiceToClientEvent(val time: Instant) {
class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time)
class OutputState(time: Instant, val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) : ServiceToClientEvent(time) class OutputState(time: Instant, val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) : ServiceToClientEvent(time)
class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time) class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time)
class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time) class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time)

View File

@ -62,6 +62,7 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) } addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) }
// Notify listeners on state changes // Notify listeners on state changes
services.storageService.validatedTransactions.updates.subscribe { tx -> notifyTransaction(tx) }
services.walletService.updates.subscribe { update -> notifyWalletUpdate(update) } services.walletService.updates.subscribe { update -> notifyWalletUpdate(update) }
smm.changes.subscribe { change -> smm.changes.subscribe { change ->
val fiberId: Long = change.third val fiberId: Long = change.third
@ -86,6 +87,10 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
internal fun notifyWalletUpdate(update: Wallet.Update) internal fun notifyWalletUpdate(update: Wallet.Update)
= notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced)) = notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced))
@VisibleForTesting
internal fun notifyTransaction(transaction: SignedTransaction)
= notifyEvent(ServiceToClientEvent.Transaction(Instant.now(), transaction))
private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) { private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) {
val req = reqMessage.command val req = reqMessage.command
val result: TransactionBuildResult? = val result: TransactionBuildResult? =

View File

@ -7,6 +7,8 @@ import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace import com.r3corda.core.utilities.trace
import rx.Observable
import rx.subjects.PublishSubject
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@ -25,6 +27,13 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
private val _transactions = ConcurrentHashMap<SecureHash, SignedTransaction>() private val _transactions = ConcurrentHashMap<SecureHash, SignedTransaction>()
private val _updatesPublisher = PublishSubject.create<SignedTransaction>()
override val updates: Observable<SignedTransaction>
get() = _updatesPublisher
private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
init { init {
logger.trace { "Initialising per file transaction storage on $storeDir" } logger.trace { "Initialising per file transaction storage on $storeDir" }
Files.createDirectories(storeDir) Files.createDirectories(storeDir)
@ -39,6 +48,7 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
transaction.serialize().writeToFile(transactionFile) transaction.serialize().writeToFile(transactionFile)
_transactions[transaction.id] = transaction _transactions[transaction.id] = transaction
logger.trace { "Stored $transaction to $transactionFile" } logger.trace { "Stored $transaction to $transactionFile" }
notify(transaction)
} }
override fun getTransaction(id: SecureHash): SignedTransaction? = _transactions[id] override fun getTransaction(id: SecureHash): SignedTransaction? = _transactions[id]

View File

@ -28,6 +28,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import rx.Observable
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.nio.file.Path import java.nio.file.Path
@ -492,6 +493,8 @@ class TwoPartyTradeProtocolTests {
class RecordingTransactionStorage(val delegate: TransactionStorage) : TransactionStorage { class RecordingTransactionStorage(val delegate: TransactionStorage) : TransactionStorage {
val records: MutableList<TxRecord> = Collections.synchronizedList(ArrayList<TxRecord>()) val records: MutableList<TxRecord> = Collections.synchronizedList(ArrayList<TxRecord>())
override val updates: Observable<SignedTransaction>
get() = delegate.updates
override fun addTransaction(transaction: SignedTransaction) { override fun addTransaction(transaction: SignedTransaction) {
records.add(TxRecord.Add(transaction)) records.add(TxRecord.Add(transaction))

View File

@ -1,5 +1,6 @@
package com.r3corda.node.services.persistence package com.r3corda.node.services.persistence
import co.paralleluniverse.strands.SettableFuture
import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import com.google.common.primitives.Ints import com.google.common.primitives.Ints
@ -12,6 +13,8 @@ import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import java.nio.file.Files import java.nio.file.Files
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class PerFileTransactionStorageTests { class PerFileTransactionStorageTests {
@ -69,6 +72,16 @@ class PerFileTransactionStorageTests {
assertThat(transactionStorage.transactions).containsExactly(transactions) assertThat(transactionStorage.transactions).containsExactly(transactions)
} }
@Test
fun `updates are fired`() {
val future = SettableFuture<SignedTransaction>()
transactionStorage.updates.subscribe { tx -> future.set(tx) }
val expected = newTransaction()
transactionStorage.addTransaction(expected)
val actual = future.get(1, TimeUnit.SECONDS)
assertEquals(expected, actual)
}
private fun newTransactionStorage() { private fun newTransactionStorage() {
transactionStorage = PerFileTransactionStorage(storeDir) transactionStorage = PerFileTransactionStorage(storeDir)
} }