Upgrade reactivex.rxjava version to latest 1.x (#111)

Update rxjava version and fix thread safety of database transaction boundary subject.
This commit is contained in:
Rick Parker 2017-01-06 10:26:44 +00:00 committed by GitHub
parent 6bb0a68d6d
commit 592ac07af0
2 changed files with 5 additions and 6 deletions

View File

@ -63,8 +63,7 @@ dependencies {
compile "com.google.guava:guava:$guava_version"
// RxJava: observable streams of events.
// TODO: We can't upgrade past 1.1.6 due to a behaviour change in RxJava breaking our code. See PR #99 for discussion. Resolve.
compile "io.reactivex:rxjava:1.1.6"
compile "io.reactivex:rxjava:1.2.4"
// Kryo: object graph serialization.
compile "com.esotericsoftware:kryo:4.0.0"

View File

@ -14,6 +14,7 @@ import org.jetbrains.exposed.sql.transactions.TransactionManager
import rx.Observable
import rx.Subscriber
import rx.subjects.PublishSubject
import rx.subjects.Subject
import rx.subjects.UnicastSubject
import java.io.Closeable
import java.security.PublicKey
@ -109,13 +110,13 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
val manager: StrandLocalTransactionManager get() = databaseToInstance[database]!!
val transactionBoundaries: PublishSubject<Boundary> get() = manager._transactionBoundaries
val transactionBoundaries: Subject<Boundary, Boundary> get() = manager._transactionBoundaries
}
data class Boundary(val txId: UUID)
private val _transactionBoundaries = PublishSubject.create<Boundary>()
private val _transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
init {
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
@ -140,7 +141,7 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
override fun currentOrNull(): Transaction? = threadLocalTx.get()
// Direct copy of [ThreadLocalTransaction].
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>, val transactionBoundaries: PublishSubject<Boundary>) : TransactionInterface {
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>, val transactionBoundaries: Subject<Boundary, Boundary>) : TransactionInterface {
val id = UUID.randomUUID()
override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
@ -190,7 +191,6 @@ fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
return subject
}
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
private class DatabaseTransactionWrappingSubscriber<U>(val db: Database?) : Subscriber<U>() {
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.