Fix a bug in the wrapper code for the vault.

If we dip down to zero subscribers, no future updates are streamed. This hasn't been seen historically, because the cash metrics observer is always present, but this will be moved out of node.
This commit is contained in:
Matthew Nesbit 2017-07-11 14:13:52 +01:00
parent 7caee508ec
commit 0b2188d27b
2 changed files with 40 additions and 4 deletions

View File

@ -6,7 +6,6 @@ import com.zaxxer.hikari.HikariDataSource
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.parsePublicKeyBase58 import net.corda.core.crypto.parsePublicKeyBase58
import net.corda.core.crypto.toBase58String import net.corda.core.crypto.toBase58String
import net.corda.core.identity.PartyAndCertificate
import net.corda.node.utilities.StrandLocalTransactionManager.Boundary import net.corda.node.utilities.StrandLocalTransactionManager.Boundary
import org.bouncycastle.cert.X509CertificateHolder import org.bouncycastle.cert.X509CertificateHolder
import org.h2.jdbc.JdbcBlob import org.h2.jdbc.JdbcBlob
@ -23,7 +22,6 @@ import java.io.Closeable
import java.security.PublicKey import java.security.PublicKey
import java.security.cert.CertPath import java.security.cert.CertPath
import java.security.cert.CertificateFactory import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.sql.Connection import java.sql.Connection
import java.time.Instant import java.time.Instant
import java.time.LocalDate import java.time.LocalDate
@ -256,7 +254,7 @@ private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
* that might be in place. * that might be in place.
*/ */
fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null): rx.Observable<T> { fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null): rx.Observable<T> {
val wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db) var wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
// Use lift to add subscribers to a special subscriber that wraps a database transaction around observations. // Use lift to add subscribers to a special subscriber that wraps a database transaction around observations.
// Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber. // Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber.
return this.lift { toBeWrappedInDbTx: Subscriber<in T> -> return this.lift { toBeWrappedInDbTx: Subscriber<in T> ->
@ -265,7 +263,13 @@ fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null)
// If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing. // If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing.
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx) if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx)
// Clean up the shared list of subscribers when they unsubscribe. // Clean up the shared list of subscribers when they unsubscribe.
}.doOnUnsubscribe { wrappingSubscriber.cleanUp() } }.doOnUnsubscribe {
wrappingSubscriber.cleanUp()
// If cleanup removed the last subscriber reset the system, as future subscribers might need the stream again
if (wrappingSubscriber.delegates.isEmpty()) {
wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
}
}
} }
// Composite columns for use with below Exposed helpers. // Composite columns for use with below Exposed helpers.

View File

@ -237,4 +237,36 @@ class ObservablesTests {
subscription2.unsubscribe() subscription2.unsubscribe()
assertThat(unsubscribed).isTrue() assertThat(unsubscribed).isTrue()
} }
@Test
fun `check wrapping in db tx restarts if we pass through zero subscribers`() {
val database = createDatabase()
val source = PublishSubject.create<Int>()
var unsubscribed = false
val bufferedObservable: Observable<Int> = source.doOnUnsubscribe { unsubscribed = true }
val databaseWrappedObservable: Observable<Int> = bufferedObservable.wrapWithDatabaseTransaction(database)
assertThat(unsubscribed).isFalse()
val subscription1 = databaseWrappedObservable.subscribe { }
val subscription2 = databaseWrappedObservable.subscribe { }
subscription1.unsubscribe()
assertThat(unsubscribed).isFalse()
subscription2.unsubscribe()
assertThat(unsubscribed).isTrue()
val event = SettableFuture.create<Int>()
val subscription3 = databaseWrappedObservable.subscribe { event.set(it) }
source.onNext(1)
assertThat(event.isDone).isTrue()
assertThat(event.get()).isEqualTo(1)
subscription3.unsubscribe()
}
} }