Merge pull request #1014 from corda/mnesbit-allow-vault-resubscribe

Fix a bug in the wrapper code for the vault.
This commit is contained in:
Matthew Nesbit 2017-07-11 17:57:26 +01:00 committed by GitHub
commit ac07b3fe94
2 changed files with 40 additions and 4 deletions

View File

@ -6,7 +6,6 @@ import com.zaxxer.hikari.HikariDataSource
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.parsePublicKeyBase58
import net.corda.core.crypto.toBase58String
import net.corda.core.identity.PartyAndCertificate
import net.corda.node.utilities.StrandLocalTransactionManager.Boundary
import org.bouncycastle.cert.X509CertificateHolder
import org.h2.jdbc.JdbcBlob
@ -23,7 +22,6 @@ import java.io.Closeable
import java.security.PublicKey
import java.security.cert.CertPath
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.sql.Connection
import java.time.Instant
import java.time.LocalDate
@ -256,7 +254,7 @@ private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
* that might be in place.
*/
fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null): rx.Observable<T> {
val wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
var wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
// Use lift to add subscribers to a special subscriber that wraps a database transaction around observations.
// Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber.
return this.lift { toBeWrappedInDbTx: Subscriber<in T> ->
@ -265,7 +263,13 @@ fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null)
// If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing.
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx)
// Clean up the shared list of subscribers when they unsubscribe.
}.doOnUnsubscribe { wrappingSubscriber.cleanUp() }
}.doOnUnsubscribe {
wrappingSubscriber.cleanUp()
// If cleanup removed the last subscriber reset the system, as future subscribers might need the stream again
if (wrappingSubscriber.delegates.isEmpty()) {
wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
}
}
}
// Composite columns for use with below Exposed helpers.

View File

@ -237,4 +237,36 @@ class ObservablesTests {
subscription2.unsubscribe()
assertThat(unsubscribed).isTrue()
}
@Test
fun `check wrapping in db tx restarts if we pass through zero subscribers`() {
val database = createDatabase()
val source = PublishSubject.create<Int>()
var unsubscribed = false
val bufferedObservable: Observable<Int> = source.doOnUnsubscribe { unsubscribed = true }
val databaseWrappedObservable: Observable<Int> = bufferedObservable.wrapWithDatabaseTransaction(database)
assertThat(unsubscribed).isFalse()
val subscription1 = databaseWrappedObservable.subscribe { }
val subscription2 = databaseWrappedObservable.subscribe { }
subscription1.unsubscribe()
assertThat(unsubscribed).isFalse()
subscription2.unsubscribe()
assertThat(unsubscribed).isTrue()
val event = SettableFuture.create<Int>()
val subscription3 = databaseWrappedObservable.subscribe { event.set(it) }
source.onNext(1)
assertThat(event.isDone).isTrue()
assertThat(event.get()).isEqualTo(1)
subscription3.unsubscribe()
}
}