Merge pull request #33 from corda/parkri-external-observations

Buffer observations until database commit.
This commit is contained in:
Rick Parker 2016-12-08 16:34:50 +00:00 committed by GitHub
commit 70dcab6361
19 changed files with 280 additions and 40 deletions

View File

@ -13,6 +13,8 @@ import kotlinx.support.jdk7.use
import net.corda.core.crypto.newSecureRandom
import org.slf4j.Logger
import rx.Observable
import rx.Observer
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.io.BufferedInputStream
import java.io.InputStream
@ -363,5 +365,15 @@ fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> {
return subject.doOnUnsubscribe { subscription.unsubscribe() }
}
/**
* Copy an [Observer] to multiple other [Observer]s.
*/
fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
val subject = PublishSubject.create<T>()
subject.subscribe(this)
teeTo.forEach { subject.subscribe(it) }
return subject
}
/** Allows summing big decimals that are in iterable collections */
fun Iterable<BigDecimal>.sum(): BigDecimal = fold(BigDecimal.ZERO) { a, b -> a + b }

View File

@ -100,8 +100,19 @@ interface VaultService {
val currentVault: Vault
/**
* Prefer the use of [updates] unless you know why you want to use this instead.
*
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the Vault will already incorporate
* the update.
* the update, and the database transaction associated with the update will still be open and current. If for some
* reason the processing crosses outside of the database transaction (for example, the update is pushed outside the current
* JVM or across to another [Thread] which is executing in a different database transaction) then the Vault may
* not incorporate the update due to racing with committing the current database transaction.
*/
val rawUpdates: Observable<Vault.Update>
/**
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the Vault will already incorporate
* the update, and the database transaction associated with the update will have been committed and closed.
*/
val updates: Observable<Vault.Update>

View File

@ -3,9 +3,7 @@ package net.corda.docs
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.*
import net.corda.core.getOrThrow
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
@ -18,7 +16,6 @@ import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
class FxTransactionBuildTutorialTest {
@ -69,13 +66,13 @@ class FxTransactionBuildTutorialTest {
printBalances()
// Setup some futures on the vaults to await the arrival of the exchanged funds at both nodes
val done2 = SettableFuture.create<Map<Currency, Amount<Currency>>>()
val done3 = SettableFuture.create<Map<Currency, Amount<Currency>>>()
val done2 = SettableFuture.create<Unit>()
val done3 = SettableFuture.create<Unit>()
val subs2 = nodeA.services.vaultService.updates.subscribe {
done2.set(nodeA.services.vaultService.cashBalances)
done2.set(Unit)
}
val subs3 = nodeB.services.vaultService.updates.subscribe {
done3.set(nodeB.services.vaultService.cashBalances)
done3.set(Unit)
}
// Now run the actual Fx exchange
val doIt = nodeA.services.startFlow(ForeignExchangeFlow("trade1",
@ -86,8 +83,14 @@ class FxTransactionBuildTutorialTest {
// wait for the flow to finish and the vault updates to be done
doIt.resultFuture.getOrThrow()
// Get the balances when the vault updates
val balancesA = done2.get()
val balancesB = done3.get()
done2.get()
val balancesA = databaseTransaction(nodeA.database) {
nodeA.services.vaultService.cashBalances
}
done3.get()
val balancesB = databaseTransaction(nodeB.database) {
nodeB.services.vaultService.cashBalances
}
subs2.unsubscribe()
subs3.unsubscribe()
println("BalanceA\n" + balancesA)

View File

@ -53,7 +53,7 @@ class WorkflowTransactionBuildTutorialTest {
net.stopNodes()
}
//@Test
@Test
fun `Run workflow to completion`() {
// Setup a vault subscriber to wait for successful upload of the proposal to NodeB
val done1 = SettableFuture.create<Unit>()

View File

@ -247,7 +247,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
// TODO: this model might change but for now it provides some de-coupling
// Add vault observers
CashBalanceAsMetricsObserver(services)
CashBalanceAsMetricsObserver(services, database)
ScheduledActivityObserver(services)
HibernateObserver(services)

View File

@ -42,7 +42,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
abstract val schemaService: SchemaService
abstract override val networkService: MessagingServiceInternal
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
* sends them to the vault for further processing. This is intended for implementations to call from

View File

@ -13,7 +13,7 @@ import net.corda.node.services.api.ServiceHubInternal
*/
class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
services.vaultService.updates.subscribe { update ->
services.vaultService.rawUpdates.subscribe { update ->
update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it) }
update.produced.forEach { scheduleStateActivity(it, services.flowLogicRefFactory) }
}

View File

@ -24,6 +24,7 @@ import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.bufferUntilDatabaseCommit
import rx.Observable
import rx.subjects.PublishSubject
import java.security.SignatureException
@ -42,7 +43,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override val partyNodes: List<NodeInfo> get() = registeredNodes.map { it.value }
override val networkMapNodes: List<NodeInfo> get() = getNodesWithService(NetworkMapService.type)
private val _changed = PublishSubject.create<MapChange>()
override val changed: Observable<MapChange> = _changed
override val changed: Observable<MapChange> get() = _changed
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
private val _registrationFuture = SettableFuture.create<Unit>()
override val mapServiceRegistered: ListenableFuture<Unit> get() = _registrationFuture
@ -91,9 +94,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
synchronized(_changed) {
val previousNode = registeredNodes.put(node.legalIdentity, node)
if (previousNode == null) {
_changed.onNext(MapChange.Added(node))
changePublisher.onNext(MapChange.Added(node))
} else if (previousNode != node) {
_changed.onNext(MapChange.Modified(node, previousNode))
changePublisher.onNext(MapChange.Modified(node, previousNode))
}
}
}
@ -101,7 +104,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun removeNode(node: NodeInfo) {
synchronized(_changed) {
registeredNodes.remove(node.legalIdentity)
_changed.onNext(MapChange.Removed(node))
changePublisher.onNext(MapChange.Removed(node))
}
}

View File

@ -50,7 +50,7 @@ class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorag
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
mutex.locked {
stateMachineTransactionMap[transactionId] = stateMachineRunId
updates.onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
}
}

View File

@ -40,7 +40,7 @@ class DBTransactionStorage : TransactionStorage {
val old = txStorage.get(transaction.id)
if (old == null) {
txStorage.put(transaction.id, transaction)
updatesPublisher.onNext(transaction)
updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
true
} else {
false

View File

@ -35,7 +35,7 @@ class HibernateObserver(services: ServiceHubInternal) {
val sessionFactories = ConcurrentHashMap<MappedSchema, SessionFactory>()
init {
services.vaultService.updates.subscribe { persist(it.produced) }
services.vaultService.rawUpdates.subscribe { persist(it.produced) }
}
private fun sessionFactoryForSchema(schema: MappedSchema): SessionFactory {

View File

@ -29,6 +29,7 @@ import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.isolatedTransaction
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
@ -93,7 +94,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val changesPublisher = PublishSubject.create<Change>()
fun notifyChangeObservers(psm: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) {
changesPublisher.onNext(Change(psm.logic, addOrRemove, psm.id))
changesPublisher.bufferUntilDatabaseCommit().onNext(Change(psm.logic, addOrRemove, psm.id))
}
})
@ -393,13 +394,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* restarted with checkpointed state machines in the storage service.
*/
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
val fiber = createFiber(logic)
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
// on the flow completion future inside that context. The problem is that any progress checkpoints are
// unable to acquire the table lock and move forward till the calling transaction finishes.
// Committing in line here on a fresh context ensure we can progress.
isolatedTransaction(database) {
val fiber = isolatedTransaction(database) {
val fiber = createFiber(logic)
updateCheckpoint(fiber)
fiber
}
// If we are not started then our checkpoint will be picked up during start
mutex.locked {

View File

@ -3,12 +3,14 @@ package net.corda.node.services.vault
import com.codahale.metrics.Gauge
import net.corda.core.node.services.VaultService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import java.util.*
/**
* This class observes the vault and reflect current cash balances as exposed metrics in the monitoring service.
*/
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal) {
class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal, val database: Database) {
init {
// TODO: Need to consider failure scenarios. This needs to run if the TX is successfully recorded
serviceHubInternal.vaultService.updates.subscribe { update ->
@ -29,13 +31,15 @@ class CashBalanceAsMetricsObserver(val serviceHubInternal: ServiceHubInternal) {
//
// Note: exported as pennies.
val m = serviceHubInternal.monitoringService.metrics
for ((key, value) in vault.cashBalances) {
val metric = balanceMetrics.getOrPut(key) {
val newMetric = BalanceMetric()
m.register("VaultBalances.${key}Pennies", newMetric)
newMetric
databaseTransaction(database) {
for ((key, value) in vault.cashBalances) {
val metric = balanceMetrics.getOrPut(key) {
val newMetric = BalanceMetric()
m.register("VaultBalances.${key}Pennies", newMetric)
newMetric
}
metric.pennies = value.quantity
}
metric.pennies = value.quantity
}
}
}

View File

@ -12,6 +12,7 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.tee
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.loggerFor
@ -80,6 +81,9 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
}
val _updatesPublisher = PublishSubject.create<Vault.Update>()
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update>()
// For use during publishing only.
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
fun allUnconsumedStates(): Iterable<StateAndRef<ContractState>> {
// Order by txhash for if and when transaction storage has some caching.
@ -104,6 +108,9 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
override val currentVault: Vault get() = mutex.locked { Vault(allUnconsumedStates()) }
override val rawUpdates: Observable<Vault.Update>
get() = mutex.locked { _rawUpdatesPublisher }
override val updates: Observable<Vault.Update>
get() = mutex.locked { _updatesPublisher }
@ -127,7 +134,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
if (netDelta != Vault.NoUpdate) {
mutex.locked {
recordUpdate(netDelta)
_updatesPublisher.onNext(netDelta)
updatesPublisher.onNext(netDelta)
}
}
return currentVault

View File

@ -7,9 +7,13 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.parsePublicKeyBase58
import net.corda.core.crypto.toBase58String
import net.corda.node.utilities.StrandLocalTransactionManager.Boundary
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionInterface
import org.jetbrains.exposed.sql.transactions.TransactionManager
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.io.Closeable
import java.security.PublicKey
import java.sql.Connection
@ -18,6 +22,7 @@ import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.*
import java.util.concurrent.ConcurrentHashMap
/**
* Table prefix for all tables owned by the node module.
@ -66,12 +71,19 @@ fun <T> isolatedTransaction(database: Database, block: Transaction.() -> T): T {
* over each other. So here we use a companion object to hold them as [ThreadLocal] and [StrandLocalTransactionManager]
* is otherwise effectively stateless so it's replacement does not matter. The [ThreadLocal] is then set correctly and
* explicitly just prior to initiating a transaction in [databaseTransaction] and [createDatabaseTransaction] above.
*
* The [StrandLocalTransactionManager] instances have an [Observable] of the transaction close [Boundary]s which
* facilitates the use of [Observable.afterDatabaseCommit] to create event streams that only emit once the database
* transaction is closed and the data has been persisted and becomes visible to other observers.
*/
class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionManager {
companion object {
private val TX_ID = Key<UUID>()
private val threadLocalDb = ThreadLocal<Database>()
private val threadLocalTx = ThreadLocal<Transaction>()
private val databaseToInstance = ConcurrentHashMap<Database, StrandLocalTransactionManager>()
fun setThreadLocalTx(tx: Transaction?): Pair<Database?, Transaction?> {
val oldTx = threadLocalTx.get()
@ -89,10 +101,21 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
set(value: Database) {
threadLocalDb.set(value)
}
val transactionId: UUID
get() = threadLocalTx.get()?.getUserData(TX_ID) ?: throw IllegalStateException("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
val manager: StrandLocalTransactionManager get() = databaseToInstance[database]!!
val transactionBoundaries: PublishSubject<Boundary> get() = manager._transactionBoundaries
}
data class Boundary(val txId: UUID)
private val _transactionBoundaries = PublishSubject.create<Boundary>()
init {
database = initWithDatabase
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
// databae transaction open. The [databaseTransaction] helper above handles this in a finally clause for you
@ -100,16 +123,23 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
if (threadLocalTx.get() != null) {
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
}
database = initWithDatabase
databaseToInstance[database] = this
}
override fun newTransaction(isolation: Int): Transaction = Transaction(StrandLocalTransaction(database, isolation, threadLocalTx)).apply {
threadLocalTx.set(this)
override fun newTransaction(isolation: Int): Transaction {
val impl = StrandLocalTransaction(database, isolation, threadLocalTx, transactionBoundaries)
return Transaction(impl).apply {
threadLocalTx.set(this)
putUserData(TX_ID, impl.id)
}
}
override fun currentOrNull(): Transaction? = threadLocalTx.get()
// Direct copy of [ThreadLocalTransaction].
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>) : TransactionInterface {
private class StrandLocalTransaction(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<Transaction>, val transactionBoundaries: PublishSubject<Boundary>) : TransactionInterface {
val id = UUID.randomUUID()
override val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
db.connector().apply {
@ -133,13 +163,33 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
override fun close() {
connection.close()
threadLocal.set(outerTransaction)
if (outerTransaction == null) {
transactionBoundaries.onNext(Boundary(id))
}
}
}
}
/**
* Buffer observations until after the current database transaction has been closed. Observations are never
* dropped, simply delayed.
*
* Primarily for use by component authors to publish observations during database transactions without racing against
* closing the database transaction.
*
* For examples, see the call hierarchy of this function.
*/
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
val currentTxId = StrandLocalTransactionManager.transactionId
val databaseTxBoundary: Observable<StrandLocalTransactionManager.Boundary> = StrandLocalTransactionManager.transactionBoundaries.filter { it.txId == currentTxId }.first()
val subject = UnicastSubject.create<T>()
subject.delaySubscription(databaseTxBoundary).subscribe(this)
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
return subject
}
// Composite columns for use with below Exposed helpers.
data class PartyColumns(val name: Column<String>, val owningKey: Column<CompositeKey>)
data class StateRefColumns(val txId: Column<SecureHash>, val index: Column<Int>)
data class TxnNoteColumns(val txId: Column<SecureHash>, val note: Column<String>)

View File

@ -8,6 +8,7 @@ import com.typesafe.config.ConfigFactory
import net.corda.core.crypto.composite
import net.corda.core.crypto.generateKeyPair
import net.corda.core.messaging.Message
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.LogHelper
@ -16,7 +17,6 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.core.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider

View File

@ -5,6 +5,7 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.expect
import net.corda.testing.node.MockNetwork
import org.junit.Test
@ -30,7 +31,9 @@ class InMemoryNetworkMapCacheTest {
// Node A currently knows only about itself, so this returns node A
assertEquals(nodeA.netMapCache.getNodeByCompositeKey(keyPair.public.composite), nodeA.info)
nodeA.netMapCache.addNode(nodeB.info)
databaseTransaction(nodeA.database) {
nodeA.netMapCache.addNode(nodeB.info)
}
// Now both nodes match, so it throws an error
expect<IllegalStateException> {
nodeA.netMapCache.getNodeByCompositeKey(keyPair.public.composite)

View File

@ -16,6 +16,7 @@ import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.junit.Before
@ -201,7 +202,9 @@ class InMemoryNetworkMapServiceTest : AbstractNetworkMapServiceTest() {
fun success() {
val (mapServiceNode, registerNode) = network.createTwoNodes()
val service = mapServiceNode.inNodeNetworkMapService!! as InMemoryNetworkMapService
success(mapServiceNode, registerNode, { service }, { })
databaseTransaction(mapServiceNode.database) {
success(mapServiceNode, registerNode, { service }, { })
}
}
@Test

View File

@ -0,0 +1,142 @@
package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.tee
import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
class ObservablesTests {
private fun isInDatabaseTransaction(): Boolean = (TransactionManager.currentOrNull() != null)
@Test
fun `bufferUntilDatabaseCommit delays until transaction closed`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val subject = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val secondEvent = SettableFuture.create<Pair<Int, Boolean>>()
observable.first().subscribe { firstEvent.set(it to isInDatabaseTransaction()) }
observable.skip(1).first().subscribe { secondEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
subject.onNext(1)
assertThat(firstEvent.isDone).isTrue()
assertThat(secondEvent.isDone).isFalse()
}
assertThat(secondEvent.isDone).isTrue()
assertThat(firstEvent.get()).isEqualTo(1 to true)
assertThat(secondEvent.get()).isEqualTo(0 to false)
toBeClosed.close()
}
@Test
fun `bufferUntilDatabaseCommit delays until transaction closed repeatable`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val subject = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val secondEvent = SettableFuture.create<Pair<Int, Boolean>>()
observable.first().subscribe { firstEvent.set(it to isInDatabaseTransaction()) }
observable.skip(1).first().subscribe { secondEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
assertThat(firstEvent.isDone).isFalse()
assertThat(secondEvent.isDone).isFalse()
}
assertThat(firstEvent.isDone).isTrue()
assertThat(firstEvent.get()).isEqualTo(0 to false)
assertThat(secondEvent.isDone).isFalse()
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(1)
assertThat(secondEvent.isDone).isFalse()
}
assertThat(secondEvent.isDone).isTrue()
assertThat(secondEvent.get()).isEqualTo(1 to false)
toBeClosed.close()
}
@Test
fun `tee correctly copies observations to multiple observers`() {
val subject1 = PublishSubject.create<Int>()
val subject2 = PublishSubject.create<Int>()
val subject3 = PublishSubject.create<Int>()
val event1 = SettableFuture.create<Int>()
val event2 = SettableFuture.create<Int>()
val event3 = SettableFuture.create<Int>()
subject1.subscribe { event1.set(it) }
subject2.subscribe { event2.set(it) }
subject3.subscribe { event3.set(it) }
val tee = subject1.tee(subject2, subject3)
tee.onNext(0)
assertThat(event1.isDone).isTrue()
assertThat(event2.isDone).isTrue()
assertThat(event3.isDone).isTrue()
assertThat(event1.get()).isEqualTo(0)
assertThat(event2.get()).isEqualTo(0)
assertThat(event3.get()).isEqualTo(0)
tee.onCompleted()
assertThat(subject1.hasCompleted()).isTrue()
assertThat(subject2.hasCompleted()).isTrue()
assertThat(subject3.hasCompleted()).isTrue()
}
@Test
fun `combine tee and bufferUntilDatabaseCommit`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val subject = PublishSubject.create<Int>()
val teed = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val teedEvent = SettableFuture.create<Pair<Int, Boolean>>()
observable.first().subscribe { firstEvent.set(it to isInDatabaseTransaction()) }
teed.first().subscribe { teedEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit().tee(teed)
assertThat(subject).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
assertThat(firstEvent.isDone).isFalse()
assertThat(teedEvent.isDone).isTrue()
}
assertThat(firstEvent.isDone).isTrue()
assertThat(firstEvent.get()).isEqualTo(0 to false)
assertThat(teedEvent.get()).isEqualTo(0 to true)
toBeClosed.close()
}
}