diff --git a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt index 83c675a7b6..b5f8e62e7c 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt @@ -6,7 +6,6 @@ import com.zaxxer.hikari.HikariDataSource import net.corda.core.crypto.SecureHash import net.corda.core.crypto.parsePublicKeyBase58 import net.corda.core.crypto.toBase58String -import net.corda.core.identity.PartyAndCertificate import net.corda.node.utilities.StrandLocalTransactionManager.Boundary import org.bouncycastle.cert.X509CertificateHolder import org.h2.jdbc.JdbcBlob @@ -23,7 +22,6 @@ import java.io.Closeable import java.security.PublicKey import java.security.cert.CertPath import java.security.cert.CertificateFactory -import java.security.cert.X509Certificate import java.sql.Connection import java.time.Instant import java.time.LocalDate @@ -256,7 +254,7 @@ private class NoOpSubscriber(t: Subscriber) : Subscriber(t) { * that might be in place. */ fun rx.Observable.wrapWithDatabaseTransaction(db: Database? = null): rx.Observable { - val wrappingSubscriber = DatabaseTransactionWrappingSubscriber(db) + var wrappingSubscriber = DatabaseTransactionWrappingSubscriber(db) // Use lift to add subscribers to a special subscriber that wraps a database transaction around observations. // Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber. return this.lift { toBeWrappedInDbTx: Subscriber -> @@ -265,7 +263,13 @@ fun rx.Observable.wrapWithDatabaseTransaction(db: Database? = null) // If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing. if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx) // Clean up the shared list of subscribers when they unsubscribe. - }.doOnUnsubscribe { wrappingSubscriber.cleanUp() } + }.doOnUnsubscribe { + wrappingSubscriber.cleanUp() + // If cleanup removed the last subscriber reset the system, as future subscribers might need the stream again + if (wrappingSubscriber.delegates.isEmpty()) { + wrappingSubscriber = DatabaseTransactionWrappingSubscriber(db) + } + } } // Composite columns for use with below Exposed helpers. diff --git a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt index 72213773fe..8e0c666796 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt @@ -237,4 +237,36 @@ class ObservablesTests { subscription2.unsubscribe() assertThat(unsubscribed).isTrue() } + + @Test + fun `check wrapping in db tx restarts if we pass through zero subscribers`() { + val database = createDatabase() + + val source = PublishSubject.create() + var unsubscribed = false + + val bufferedObservable: Observable = source.doOnUnsubscribe { unsubscribed = true } + val databaseWrappedObservable: Observable = bufferedObservable.wrapWithDatabaseTransaction(database) + + assertThat(unsubscribed).isFalse() + + val subscription1 = databaseWrappedObservable.subscribe { } + val subscription2 = databaseWrappedObservable.subscribe { } + + subscription1.unsubscribe() + assertThat(unsubscribed).isFalse() + + subscription2.unsubscribe() + assertThat(unsubscribed).isTrue() + + val event = SettableFuture.create() + val subscription3 = databaseWrappedObservable.subscribe { event.set(it) } + + source.onNext(1) + + assertThat(event.isDone).isTrue() + assertThat(event.get()).isEqualTo(1) + + subscription3.unsubscribe() + } } \ No newline at end of file