diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt b/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt index 31079b8b1b..e3479e0cd9 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt @@ -11,6 +11,12 @@ interface ReadOnlyTransactionStorage { * Return the transaction with the given [id], or null if no such transaction exists. */ fun getTransaction(id: SecureHash): SignedTransaction? + + /** + * Get a synchronous Observable of updates. When observations are pushed to the Observer, the Wallet will already incorporate + * the update. + */ + val updates: rx.Observable } /** diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt index d5c6665dde..264ae2d5ba 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt @@ -15,6 +15,8 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.testing.DUMMY_NOTARY import com.r3corda.core.testing.MEGA_CORP import com.r3corda.core.testing.MINI_CORP +import rx.Observable +import rx.subjects.PublishSubject import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.io.File @@ -116,9 +118,19 @@ class MockAttachmentStorage : AttachmentStorage { open class MockTransactionStorage : TransactionStorage { private val txns = HashMap() + + private val _updatesPublisher = PublishSubject.create() + + override val updates: Observable + get() = _updatesPublisher + + private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction) + override fun addTransaction(transaction: SignedTransaction) { txns[transaction.id] = transaction + notify(transaction) } + override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id] } diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt index 3cc6fb8ef8..1f9e80307e 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt @@ -12,6 +12,7 @@ import java.util.* * Events triggered by changes in the node, and sent to monitoring client(s). */ sealed class ServiceToClientEvent(val time: Instant) { + class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time) class OutputState(time: Instant, val consumed: Set, val produced: Set>) : ServiceToClientEvent(time) class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time) class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time) diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt index 5b901af5fe..5130462de8 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt @@ -62,6 +62,7 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) } // Notify listeners on state changes + services.storageService.validatedTransactions.updates.subscribe { tx -> notifyTransaction(tx) } services.walletService.updates.subscribe { update -> notifyWalletUpdate(update) } smm.changes.subscribe { change -> val fiberId: Long = change.third @@ -86,6 +87,10 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, internal fun notifyWalletUpdate(update: Wallet.Update) = notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced)) + @VisibleForTesting + internal fun notifyTransaction(transaction: SignedTransaction) + = notifyEvent(ServiceToClientEvent.Transaction(Instant.now(), transaction)) + private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) { val req = reqMessage.command val result: TransactionBuildResult? = diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt index 6e063c5181..4437733992 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt @@ -7,6 +7,8 @@ import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace +import rx.Observable +import rx.subjects.PublishSubject import java.nio.file.Files import java.nio.file.Path import java.util.concurrent.ConcurrentHashMap @@ -25,6 +27,13 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage { private val _transactions = ConcurrentHashMap() + private val _updatesPublisher = PublishSubject.create() + + override val updates: Observable + get() = _updatesPublisher + + private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction) + init { logger.trace { "Initialising per file transaction storage on $storeDir" } Files.createDirectories(storeDir) @@ -39,6 +48,7 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage { transaction.serialize().writeToFile(transactionFile) _transactions[transaction.id] = transaction logger.trace { "Stored $transaction to $transactionFile" } + notify(transaction) } override fun getTransaction(id: SecureHash): SignedTransaction? = _transactions[id] diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index d991fb60f3..082061255d 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -28,6 +28,7 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before import org.junit.Test +import rx.Observable import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.nio.file.Path @@ -492,6 +493,8 @@ class TwoPartyTradeProtocolTests { class RecordingTransactionStorage(val delegate: TransactionStorage) : TransactionStorage { val records: MutableList = Collections.synchronizedList(ArrayList()) + override val updates: Observable + get() = delegate.updates override fun addTransaction(transaction: SignedTransaction) { records.add(TxRecord.Add(transaction)) diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt index d7a347c69a..94ac5a89ab 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt @@ -1,5 +1,6 @@ package com.r3corda.node.services.persistence +import co.paralleluniverse.strands.SettableFuture import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs import com.google.common.primitives.Ints @@ -12,6 +13,8 @@ import org.junit.After import org.junit.Before import org.junit.Test import java.nio.file.Files +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals class PerFileTransactionStorageTests { @@ -69,6 +72,16 @@ class PerFileTransactionStorageTests { assertThat(transactionStorage.transactions).containsExactly(transactions) } + @Test + fun `updates are fired`() { + val future = SettableFuture() + transactionStorage.updates.subscribe { tx -> future.set(tx) } + val expected = newTransaction() + transactionStorage.addTransaction(expected) + val actual = future.get(1, TimeUnit.SECONDS) + assertEquals(expected, actual) + } + private fun newTransactionStorage() { transactionStorage = PerFileTransactionStorage(storeDir) }