From 8346c58d4caaa9e407e8e24cfd830bff653148ab Mon Sep 17 00:00:00 2001
From: Ross Nicoll <ross.nicoll@r3cev.com>
Date: Fri, 12 Aug 2016 11:57:12 +0100
Subject: [PATCH] Add observable for transactions being stored

Add observable for transactions being stored, so the UI can show transactions as they're received, rather than being
limited to the summarised version available from the wallet service.
---
 .../core/node/services/TransactionStorage.kt        |  6 ++++++
 .../core/node/services/testing/MockServices.kt      | 12 ++++++++++++
 .../com/r3corda/node/services/monitor/Events.kt     |  1 +
 .../node/services/monitor/WalletMonitorService.kt   |  5 +++++
 .../persistence/PerFileTransactionStorage.kt        | 10 ++++++++++
 .../node/messaging/TwoPartyTradeProtocolTests.kt    |  3 +++
 .../persistence/PerFileTransactionStorageTests.kt   | 13 +++++++++++++
 7 files changed, 50 insertions(+)

diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt b/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt
index 31079b8b1b..e3479e0cd9 100644
--- a/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt
+++ b/core/src/main/kotlin/com/r3corda/core/node/services/TransactionStorage.kt
@@ -11,6 +11,12 @@ interface ReadOnlyTransactionStorage {
      * Return the transaction with the given [id], or null if no such transaction exists.
      */
     fun getTransaction(id: SecureHash): SignedTransaction?
+
+    /**
+     * Get a synchronous Observable of updates.  When observations are pushed to the Observer, the Wallet will already incorporate
+     * the update.
+     */
+    val updates: rx.Observable<SignedTransaction>
 }
 
 /**
diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt
index d5c6665dde..264ae2d5ba 100644
--- a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt
+++ b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt
@@ -15,6 +15,8 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
 import com.r3corda.core.testing.DUMMY_NOTARY
 import com.r3corda.core.testing.MEGA_CORP
 import com.r3corda.core.testing.MINI_CORP
+import rx.Observable
+import rx.subjects.PublishSubject
 import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 import java.io.File
@@ -116,9 +118,19 @@ class MockAttachmentStorage : AttachmentStorage {
 
 open class MockTransactionStorage : TransactionStorage {
     private val txns = HashMap<SecureHash, SignedTransaction>()
+
+    private val _updatesPublisher = PublishSubject.create<SignedTransaction>()
+
+    override val updates: Observable<SignedTransaction>
+        get() = _updatesPublisher
+
+    private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
+
     override fun addTransaction(transaction: SignedTransaction) {
         txns[transaction.id] = transaction
+        notify(transaction)
     }
+
     override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]
 }
 
diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt
index 3cc6fb8ef8..1f9e80307e 100644
--- a/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt
+++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/Events.kt
@@ -12,6 +12,7 @@ import java.util.*
  * Events triggered by changes in the node, and sent to monitoring client(s).
  */
 sealed class ServiceToClientEvent(val time: Instant) {
+    class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time)
     class OutputState(time: Instant, val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) : ServiceToClientEvent(time)
     class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time)
     class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time)
diff --git a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt
index 5b901af5fe..5130462de8 100644
--- a/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt
+++ b/node/src/main/kotlin/com/r3corda/node/services/monitor/WalletMonitorService.kt
@@ -62,6 +62,7 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
         addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) }
 
         // Notify listeners on state changes
+        services.storageService.validatedTransactions.updates.subscribe { tx -> notifyTransaction(tx) }
         services.walletService.updates.subscribe { update -> notifyWalletUpdate(update) }
         smm.changes.subscribe { change ->
             val fiberId: Long = change.third
@@ -86,6 +87,10 @@ class WalletMonitorService(net: MessagingService, val smm: StateMachineManager,
     internal fun notifyWalletUpdate(update: Wallet.Update)
             = notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced))
 
+    @VisibleForTesting
+    internal fun notifyTransaction(transaction: SignedTransaction)
+        = notifyEvent(ServiceToClientEvent.Transaction(Instant.now(), transaction))
+
     private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) {
         val req = reqMessage.command
         val result: TransactionBuildResult? =
diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt
index 6e063c5181..4437733992 100644
--- a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt
+++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt
@@ -7,6 +7,8 @@ import com.r3corda.core.serialization.deserialize
 import com.r3corda.core.serialization.serialize
 import com.r3corda.core.utilities.loggerFor
 import com.r3corda.core.utilities.trace
+import rx.Observable
+import rx.subjects.PublishSubject
 import java.nio.file.Files
 import java.nio.file.Path
 import java.util.concurrent.ConcurrentHashMap
@@ -25,6 +27,13 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
 
     private val _transactions = ConcurrentHashMap<SecureHash, SignedTransaction>()
 
+    private val _updatesPublisher = PublishSubject.create<SignedTransaction>()
+
+    override val updates: Observable<SignedTransaction>
+        get() = _updatesPublisher
+
+    private fun notify(transaction: SignedTransaction) = _updatesPublisher.onNext(transaction)
+
     init {
         logger.trace { "Initialising per file transaction storage on $storeDir" }
         Files.createDirectories(storeDir)
@@ -39,6 +48,7 @@ class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
         transaction.serialize().writeToFile(transactionFile)
         _transactions[transaction.id] = transaction
         logger.trace { "Stored $transaction to $transactionFile" }
+        notify(transaction)
     }
 
     override fun getTransaction(id: SecureHash): SignedTransaction? = _transactions[id]
diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt
index d991fb60f3..082061255d 100644
--- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt
+++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt
@@ -28,6 +28,7 @@ import org.assertj.core.api.Assertions.assertThat
 import org.junit.After
 import org.junit.Before
 import org.junit.Test
+import rx.Observable
 import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 import java.nio.file.Path
@@ -492,6 +493,8 @@ class TwoPartyTradeProtocolTests {
     class RecordingTransactionStorage(val delegate: TransactionStorage) : TransactionStorage {
 
         val records: MutableList<TxRecord> = Collections.synchronizedList(ArrayList<TxRecord>())
+        override val updates: Observable<SignedTransaction>
+            get() = delegate.updates
 
         override fun addTransaction(transaction: SignedTransaction) {
             records.add(TxRecord.Add(transaction))
diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt
index d7a347c69a..94ac5a89ab 100644
--- a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt
+++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt
@@ -1,5 +1,6 @@
 package com.r3corda.node.services.persistence
 
+import co.paralleluniverse.strands.SettableFuture
 import com.google.common.jimfs.Configuration.unix
 import com.google.common.jimfs.Jimfs
 import com.google.common.primitives.Ints
@@ -12,6 +13,8 @@ import org.junit.After
 import org.junit.Before
 import org.junit.Test
 import java.nio.file.Files
+import java.util.concurrent.TimeUnit
+import kotlin.test.assertEquals
 
 class PerFileTransactionStorageTests {
 
@@ -69,6 +72,16 @@ class PerFileTransactionStorageTests {
         assertThat(transactionStorage.transactions).containsExactly(transactions)
     }
 
+    @Test
+    fun `updates are fired`() {
+        val future = SettableFuture<SignedTransaction>()
+        transactionStorage.updates.subscribe { tx -> future.set(tx) }
+        val expected = newTransaction()
+        transactionStorage.addTransaction(expected)
+        val actual = future.get(1, TimeUnit.SECONDS)
+        assertEquals(expected, actual)
+    }
+
     private fun newTransactionStorage() {
         transactionStorage = PerFileTransactionStorage(storeDir)
     }