From 592ac07af04e77a5da9923129976837f562e9103 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Fri, 6 Jan 2017 10:26:44 +0000 Subject: [PATCH] Upgrade reactivex.rxjava version to latest 1.x (#111) Update rxjava version and fix thread safety of database transaction boundary subject. --- core/build.gradle | 3 +-- .../kotlin/net/corda/node/utilities/DatabaseSupport.kt | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/build.gradle b/core/build.gradle index f437155d1e..2afd7e2634 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -63,8 +63,7 @@ dependencies { compile "com.google.guava:guava:$guava_version" // RxJava: observable streams of events. - // TODO: We can't upgrade past 1.1.6 due to a behaviour change in RxJava breaking our code. See PR #99 for discussion. Resolve. - compile "io.reactivex:rxjava:1.1.6" + compile "io.reactivex:rxjava:1.2.4" // Kryo: object graph serialization. compile "com.esotericsoftware:kryo:4.0.0" diff --git a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt index 8fa8727a36..0fc0f38f9a 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt @@ -14,6 +14,7 @@ import org.jetbrains.exposed.sql.transactions.TransactionManager import rx.Observable import rx.Subscriber import rx.subjects.PublishSubject +import rx.subjects.Subject import rx.subjects.UnicastSubject import java.io.Closeable import java.security.PublicKey @@ -109,13 +110,13 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan val manager: StrandLocalTransactionManager get() = databaseToInstance[database]!! - val transactionBoundaries: PublishSubject get() = manager._transactionBoundaries + val transactionBoundaries: Subject get() = manager._transactionBoundaries } data class Boundary(val txId: UUID) - private val _transactionBoundaries = PublishSubject.create() + private val _transactionBoundaries = PublishSubject.create().toSerialized() init { // Found a unit test that was forgetting to close the database transactions. When you close() on the top level @@ -140,7 +141,7 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan override fun currentOrNull(): Transaction? = threadLocalTx.get() // Direct copy of [ThreadLocalTransaction]. - private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal, val transactionBoundaries: PublishSubject) : TransactionInterface { + private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal, val transactionBoundaries: Subject) : TransactionInterface { val id = UUID.randomUUID() override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) { @@ -190,7 +191,6 @@ fun rx.Observer.bufferUntilDatabaseCommit(): rx.Observer { return subject } - // A subscriber that delegates to multiple others, wrapping a database transaction around the combination. private class DatabaseTransactionWrappingSubscriber(val db: Database?) : Subscriber() { // Some unsubscribes happen inside onNext() so need something that supports concurrent modification.