From 0b2188d27bcbffa6d2cccfd3c0edca81c87abfe6 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Tue, 11 Jul 2017 14:13:52 +0100 Subject: [PATCH] Fix a bug in the wrapper code for the vault. If we dip down to zero subscribers, no future updates are streamed. This hasn't been seen historically, because the cash metrics observer is always present, but this will be moved out of node. --- .../corda/node/utilities/DatabaseSupport.kt | 12 ++++--- .../corda/node/utilities/ObservablesTests.kt | 32 +++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt index 83c675a7b6..b5f8e62e7c 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt @@ -6,7 +6,6 @@ import com.zaxxer.hikari.HikariDataSource import net.corda.core.crypto.SecureHash import net.corda.core.crypto.parsePublicKeyBase58 import net.corda.core.crypto.toBase58String -import net.corda.core.identity.PartyAndCertificate import net.corda.node.utilities.StrandLocalTransactionManager.Boundary import org.bouncycastle.cert.X509CertificateHolder import org.h2.jdbc.JdbcBlob @@ -23,7 +22,6 @@ import java.io.Closeable import java.security.PublicKey import java.security.cert.CertPath import java.security.cert.CertificateFactory -import java.security.cert.X509Certificate import java.sql.Connection import java.time.Instant import java.time.LocalDate @@ -256,7 +254,7 @@ private class NoOpSubscriber(t: Subscriber) : Subscriber(t) { * that might be in place. */ fun rx.Observable.wrapWithDatabaseTransaction(db: Database? = null): rx.Observable { - val wrappingSubscriber = DatabaseTransactionWrappingSubscriber(db) + var wrappingSubscriber = DatabaseTransactionWrappingSubscriber(db) // Use lift to add subscribers to a special subscriber that wraps a database transaction around observations. // Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber. return this.lift { toBeWrappedInDbTx: Subscriber -> @@ -265,7 +263,13 @@ fun rx.Observable.wrapWithDatabaseTransaction(db: Database? = null) // If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing. if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx) // Clean up the shared list of subscribers when they unsubscribe. - }.doOnUnsubscribe { wrappingSubscriber.cleanUp() } + }.doOnUnsubscribe { + wrappingSubscriber.cleanUp() + // If cleanup removed the last subscriber reset the system, as future subscribers might need the stream again + if (wrappingSubscriber.delegates.isEmpty()) { + wrappingSubscriber = DatabaseTransactionWrappingSubscriber(db) + } + } } // Composite columns for use with below Exposed helpers. diff --git a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt index 72213773fe..8e0c666796 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt @@ -237,4 +237,36 @@ class ObservablesTests { subscription2.unsubscribe() assertThat(unsubscribed).isTrue() } + + @Test + fun `check wrapping in db tx restarts if we pass through zero subscribers`() { + val database = createDatabase() + + val source = PublishSubject.create() + var unsubscribed = false + + val bufferedObservable: Observable = source.doOnUnsubscribe { unsubscribed = true } + val databaseWrappedObservable: Observable = bufferedObservable.wrapWithDatabaseTransaction(database) + + assertThat(unsubscribed).isFalse() + + val subscription1 = databaseWrappedObservable.subscribe { } + val subscription2 = databaseWrappedObservable.subscribe { } + + subscription1.unsubscribe() + assertThat(unsubscribed).isFalse() + + subscription2.unsubscribe() + assertThat(unsubscribed).isTrue() + + val event = SettableFuture.create() + val subscription3 = databaseWrappedObservable.subscribe { event.set(it) } + + source.onNext(1) + + assertThat(event.isDone).isTrue() + assertThat(event.get()).isEqualTo(1) + + subscription3.unsubscribe() + } } \ No newline at end of file