Merged in rnicoll-notify-tx (pull request #282)

Add observable for transactions being stored
This commit is contained in:
Ross Nicoll 2016-08-16 14:50:55 +01:00
commit a9ec3c253e
7 changed files with 50 additions and 0 deletions

View File

@ -11,6 +11,12 @@ interface ReadOnlyTransactionStorage {
* Return the transaction with the given [id], or null if no such transaction exists.
*/
fun getTransaction(id: SecureHash): SignedTransaction?
/**
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the Wallet will already incorporate
* the update.
*/
val updates: rx.Observable<SignedTransaction>
}
/**

View File

@ -15,6 +15,8 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.testing.DUMMY_NOTARY
import com.r3corda.core.testing.MEGA_CORP
import com.r3corda.core.testing.MINI_CORP
import rx.Observable
import rx.subjects.PublishSubject
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.File
@ -116,9 +118,19 @@ class MockAttachmentStorage : AttachmentStorage {
open class MockTransactionStorage : TransactionStorage {
private val txns = HashMap<SecureHash, SignedTransaction>()
private val _updatesPublisher = PublishSubject.create<SignedTransaction>()
override val updates: Observable<SignedTransaction>
get() = _updatesPublisher
private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
override fun addTransaction(transaction: SignedTransaction) {
txns[transaction.id] = transaction
notify(transaction)
}
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]
}

View File

@ -12,6 +12,7 @@ import java.util.*
* Events triggered by changes in the node, and sent to monitoring client(s).
*/
sealed class ServiceToClientEvent(val time: Instant) {
class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time)
class OutputState(time: Instant, val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) : ServiceToClientEvent(time)
class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time)
class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time)

View File

@ -62,6 +62,7 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) }
// Notify listeners on state changes
services.storageService.validatedTransactions.updates.subscribe { tx -> notifyTransaction(tx) }
services.walletService.updates.subscribe { update -> notifyWalletUpdate(update) }
smm.changes.subscribe { change ->
val fiberId: Long = change.third
@ -86,6 +87,10 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
internal fun notifyWalletUpdate(update: Wallet.Update)
= notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced))
@VisibleForTesting
internal fun notifyTransaction(transaction: SignedTransaction)
= notifyEvent(ServiceToClientEvent.Transaction(Instant.now(), transaction))
private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) {
val req = reqMessage.command
val result: TransactionBuildResult? =

View File

@ -7,6 +7,8 @@ import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import rx.Observable
import rx.subjects.PublishSubject
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
@ -25,6 +27,13 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
private val _transactions = ConcurrentHashMap<SecureHash, SignedTransaction>()
private val _updatesPublisher = PublishSubject.create<SignedTransaction>()
override val updates: Observable<SignedTransaction>
get() = _updatesPublisher
private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
init {
logger.trace { "Initialising per file transaction storage on $storeDir" }
Files.createDirectories(storeDir)
@ -39,6 +48,7 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
transaction.serialize().writeToFile(transactionFile)
_transactions[transaction.id] = transaction
logger.trace { "Stored $transaction to $transactionFile" }
notify(transaction)
}
override fun getTransaction(id: SecureHash): SignedTransaction? = _transactions[id]

View File

@ -28,6 +28,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.nio.file.Path
@ -503,6 +504,8 @@ class TwoPartyTradeProtocolTests {
class RecordingTransactionStorage(val delegate: TransactionStorage) : TransactionStorage {
val records: MutableList<TxRecord> = Collections.synchronizedList(ArrayList<TxRecord>())
override val updates: Observable<SignedTransaction>
get() = delegate.updates
override fun addTransaction(transaction: SignedTransaction) {
records.add(TxRecord.Add(transaction))

View File

@ -1,5 +1,6 @@
package com.r3corda.node.services.persistence
import co.paralleluniverse.strands.SettableFuture
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.google.common.primitives.Ints
@ -12,6 +13,8 @@ import org.junit.After
import org.junit.Before
import org.junit.Test
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class PerFileTransactionStorageTests {
@ -69,6 +72,16 @@ class PerFileTransactionStorageTests {
assertThat(transactionStorage.transactions).containsExactly(transactions)
}
@Test
fun `updates are fired`() {
val future = SettableFuture<SignedTransaction>()
transactionStorage.updates.subscribe { tx -> future.set(tx) }
val expected = newTransaction()
transactionStorage.addTransaction(expected)
val actual = future.get(1, TimeUnit.SECONDS)
assertEquals(expected, actual)
}
private fun newTransactionStorage() {
transactionStorage = PerFileTransactionStorage(storeDir)
}