Add database transactions back onto observations. (#77)

Add database transactions back onto Observables now that they are post database commit.
This commit is contained in:
Rick Parker 2016-12-22 10:03:22 +00:00 committed by GitHub
parent ffe1d234ca
commit bd979534f3
9 changed files with 235 additions and 71 deletions

View File

@ -15,11 +15,9 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.serialization.serialize
import net.corda.node.services.messaging.requirePermission
import net.corda.core.toObservable
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.messaging.createRPCKryo
import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
@ -27,12 +25,8 @@ import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import rx.Observable
import java.io.BufferedInputStream
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
import java.time.Instant
import java.time.LocalDateTime
/**
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
@ -46,7 +40,9 @@ class CordaRPCOpsImpl(
override val protocolVersion: Int get() = 0
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
return services.networkMapCache.track()
return databaseTransaction(database) {
services.networkMapCache.track()
}
}
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
@ -63,11 +59,13 @@ class CordaRPCOpsImpl(
}
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
val (allStateMachines, changes) = smm.track()
return Pair(
allStateMachines.map { stateMachineInfoFromFlowLogic(it.id, it.logic) },
changes.map { stateMachineUpdateFromStateMachineChange(it) }
)
return databaseTransaction(database) {
val (allStateMachines, changes) = smm.track()
Pair(
allStateMachines.map { stateMachineInfoFromFlowLogic(it.id, it.logic) },
changes.map { stateMachineUpdateFromStateMachineChange(it) }
)
}
}
override fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {

View File

@ -26,6 +26,7 @@ import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import rx.Observable
import rx.subjects.PublishSubject
import java.security.SignatureException
@ -44,7 +45,8 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override val partyNodes: List<NodeInfo> get() = registeredNodes.map { it.value }
override val networkMapNodes: List<NodeInfo> get() = getNodesWithService(NetworkMapService.type)
private val _changed = PublishSubject.create<MapChange>()
override val changed: Observable<MapChange> get() = _changed
// We use assignment here so that multiple subscribers share the same wrapped Observable.
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
private val _registrationFuture = SettableFuture.create<Unit>()
@ -70,7 +72,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun track(): Pair<List<NodeInfo>, Observable<MapChange>> {
synchronized(_changed) {
return Pair(partyNodes, _changed.bufferUntilSubscribed())
return Pair(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}

View File

@ -58,7 +58,7 @@ class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorag
mutex.locked {
return Pair(
stateMachineTransactionMap.map { StateMachineTransactionMapping(it.value, it.key) },
updates.bufferUntilSubscribed()
updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()
)
}
}

View File

@ -59,12 +59,11 @@ class DBTransactionStorage : TransactionStorage {
}
val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
override val updates: Observable<SignedTransaction>
get() = updatesPublisher
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
synchronized(txStorage) {
return Pair(txStorage.values.toList(), updates.bufferUntilSubscribed())
return Pair(txStorage.values.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}

View File

@ -9,8 +9,7 @@ import com.esotericsoftware.kryo.Kryo
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.abbreviate
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowLogic
@ -18,9 +17,7 @@ import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.TopicSession
import net.corda.core.messaging.send
import net.corda.core.random63BitValue
import net.corda.core.serialization.*
import net.corda.core.then
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
@ -30,15 +27,11 @@ import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiated
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState.Initiating
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.isolatedTransaction
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
@ -142,9 +135,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
/**
* An observable that emits triples of the changing flow, the type of change, and a process-specific ID number
* which may change across restarts.
*
* We use assignment here so that multiple subscribers share the same wrapped Observable.
*/
val changes: Observable<Change>
get() = mutex.content.changesPublisher
val changes: Observable<Change> = mutex.content.changesPublisher.wrapWithDatabaseTransaction()
init {
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
@ -188,9 +182,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
*/
fun track(): Pair<List<FlowStateMachineImpl<*>>, Observable<Change>> {
return mutex.locked {
val bufferedChanges = UnicastSubject.create<Change>()
changesPublisher.subscribe(bufferedChanges)
Pair(stateMachines.keys.toList(), bufferedChanges)
Pair(stateMachines.keys.toList(), changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}

View File

@ -100,6 +100,8 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
val _updatesPublisher = PublishSubject.create<Vault.Update>()
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update>()
val _updatesInDbTx = _updatesPublisher.wrapWithDatabaseTransaction().asObservable()
// For use during publishing only.
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
@ -153,11 +155,11 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
get() = mutex.locked { _rawUpdatesPublisher }
override val updates: Observable<Vault.Update>
get() = mutex.locked { _updatesPublisher }
get() = mutex.locked { _updatesInDbTx }
override fun track(): Pair<Vault, Observable<Vault.Update>> {
return mutex.locked {
Pair(Vault(allUnconsumedStates()), _updatesPublisher.bufferUntilSubscribed())
Pair(Vault(allUnconsumedStates()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}

View File

@ -12,6 +12,7 @@ import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionInterface
import org.jetbrains.exposed.sql.transactions.TransactionManager
import rx.Observable
import rx.Subscriber
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.io.Closeable
@ -23,6 +24,7 @@ import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
/**
* Table prefix for all tables owned by the node module.
@ -188,6 +190,75 @@ fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
return subject
}
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
private class DatabaseTransactionWrappingSubscriber<U>(val db: Database?) : Subscriber<U>() {
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.
val delegates = CopyOnWriteArrayList<Subscriber<in U>>()
fun forEachSubscriberWithDbTx(block: Subscriber<in U>.() -> Unit) {
databaseTransaction(db ?: StrandLocalTransactionManager.database) {
delegates.filter { !it.isUnsubscribed }.forEach {
it.block()
}
}
}
override fun onCompleted() {
forEachSubscriberWithDbTx { onCompleted() }
}
override fun onError(e: Throwable?) {
forEachSubscriberWithDbTx { onError(e) }
}
override fun onNext(s: U) {
forEachSubscriberWithDbTx { onNext(s) }
}
override fun onStart() {
forEachSubscriberWithDbTx { onStart() }
}
fun cleanUp() {
if (delegates.removeIf { it.isUnsubscribed }) {
if (delegates.isEmpty()) {
unsubscribe()
}
}
}
}
// A subscriber that wraps another but does not pass on observations to it.
private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
override fun onNext(s: U) {
}
}
/**
* Wrap delivery of observations in a database transaction. Multiple subscribers will receive the observations inside
* the same database transaction. This also lazily subscribes to the source [rx.Observable] to preserve any buffering
* that might be in place.
*/
fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null): rx.Observable<T> {
val wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
// Use lift to add subscribers to a special subscriber that wraps a database transaction around observations.
// Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber.
return this.lift { toBeWrappedInDbTx: Subscriber<in T> ->
// Add the subscriber to the wrapping subscriber, which will invoke the original subscribers together inside a database transaction.
wrappingSubscriber.delegates.add(toBeWrappedInDbTx)
// If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing.
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber<T>(toBeWrappedInDbTx)
// Clean up the shared list of subscribers when they unsubscribe.
}.doOnUnsubscribe { wrappingSubscriber.cleanUp() }
}
// Composite columns for use with below Exposed helpers.
data class PartyColumns(val name: Column<String>, val owningKey: Column<CompositeKey>)
data class StateRefColumns(val txId: Column<SecureHash>, val index: Column<Int>)

View File

@ -50,9 +50,11 @@ class CordaRPCOpsImplTest {
rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database)
CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(startFlowPermission<CashFlow>())))
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
transactions = rpc.verifiedTransactions().second
vaultUpdates = rpc.vaultAndUpdates().second
databaseTransaction(aliceNode.database) {
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
transactions = rpc.verifiedTransactions().second
vaultUpdates = rpc.vaultAndUpdates().second
}
}
@Test

View File

@ -1,24 +1,43 @@
package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.bufferUntilSubscribed
import net.corda.core.tee
import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.junit.After
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
import java.io.Closeable
import java.util.*
class ObservablesTests {
private fun isInDatabaseTransaction(): Boolean = (TransactionManager.currentOrNull() != null)
val toBeClosed = mutableListOf<Closeable>()
fun createDatabase(): Database {
val (closeable, database) = configureDatabase(makeTestDataSourceProperties())
toBeClosed += closeable
return database
}
@After
fun after() {
toBeClosed.forEach { it.close() }
toBeClosed.clear()
}
@Test
fun `bufferUntilDatabaseCommit delays until transaction closed`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val database = createDatabase()
val subject = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val source = PublishSubject.create<Int>()
val observable: Observable<Int> = source
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val secondEvent = SettableFuture.create<Pair<Int, Boolean>>()
@ -27,10 +46,10 @@ class ObservablesTests {
observable.skip(1).first().subscribe { secondEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
val delayedSubject = source.bufferUntilDatabaseCommit()
assertThat(source).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
subject.onNext(1)
source.onNext(1)
assertThat(firstEvent.isDone).isTrue()
assertThat(secondEvent.isDone).isFalse()
}
@ -38,16 +57,14 @@ class ObservablesTests {
assertThat(firstEvent.get()).isEqualTo(1 to true)
assertThat(secondEvent.get()).isEqualTo(0 to false)
toBeClosed.close()
}
@Test
fun `bufferUntilDatabaseCommit delays until transaction closed repeatable`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val database = createDatabase()
val subject = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val source = PublishSubject.create<Int>()
val observable: Observable<Int> = source
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val secondEvent = SettableFuture.create<Pair<Int, Boolean>>()
@ -56,8 +73,8 @@ class ObservablesTests {
observable.skip(1).first().subscribe { secondEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
val delayedSubject = source.bufferUntilDatabaseCommit()
assertThat(source).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
assertThat(firstEvent.isDone).isFalse()
assertThat(secondEvent.isDone).isFalse()
@ -67,33 +84,31 @@ class ObservablesTests {
assertThat(secondEvent.isDone).isFalse()
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit()
assertThat(subject).isNotEqualTo(delayedSubject)
val delayedSubject = source.bufferUntilDatabaseCommit()
assertThat(source).isNotEqualTo(delayedSubject)
delayedSubject.onNext(1)
assertThat(secondEvent.isDone).isFalse()
}
assertThat(secondEvent.isDone).isTrue()
assertThat(secondEvent.get()).isEqualTo(1 to false)
toBeClosed.close()
}
@Test
fun `tee correctly copies observations to multiple observers`() {
val subject1 = PublishSubject.create<Int>()
val subject2 = PublishSubject.create<Int>()
val subject3 = PublishSubject.create<Int>()
val source1 = PublishSubject.create<Int>()
val source2 = PublishSubject.create<Int>()
val source3 = PublishSubject.create<Int>()
val event1 = SettableFuture.create<Int>()
val event2 = SettableFuture.create<Int>()
val event3 = SettableFuture.create<Int>()
subject1.subscribe { event1.set(it) }
subject2.subscribe { event2.set(it) }
subject3.subscribe { event3.set(it) }
source1.subscribe { event1.set(it) }
source2.subscribe { event2.set(it) }
source3.subscribe { event3.set(it) }
val tee = subject1.tee(subject2, subject3)
val tee = source1.tee(source2, source3)
tee.onNext(0)
assertThat(event1.isDone).isTrue()
@ -104,19 +119,19 @@ class ObservablesTests {
assertThat(event3.get()).isEqualTo(0)
tee.onCompleted()
assertThat(subject1.hasCompleted()).isTrue()
assertThat(subject2.hasCompleted()).isTrue()
assertThat(subject3.hasCompleted()).isTrue()
assertThat(source1.hasCompleted()).isTrue()
assertThat(source2.hasCompleted()).isTrue()
assertThat(source3.hasCompleted()).isTrue()
}
@Test
fun `combine tee and bufferUntilDatabaseCommit`() {
val (toBeClosed, database) = configureDatabase(makeTestDataSourceProperties())
val database = createDatabase()
val subject = PublishSubject.create<Int>()
val source = PublishSubject.create<Int>()
val teed = PublishSubject.create<Int>()
val observable: Observable<Int> = subject
val observable: Observable<Int> = source
val firstEvent = SettableFuture.create<Pair<Int, Boolean>>()
val teedEvent = SettableFuture.create<Pair<Int, Boolean>>()
@ -126,8 +141,8 @@ class ObservablesTests {
teed.first().subscribe { teedEvent.set(it to isInDatabaseTransaction()) }
databaseTransaction(database) {
val delayedSubject = subject.bufferUntilDatabaseCommit().tee(teed)
assertThat(subject).isNotEqualTo(delayedSubject)
val delayedSubject = source.bufferUntilDatabaseCommit().tee(teed)
assertThat(source).isNotEqualTo(delayedSubject)
delayedSubject.onNext(0)
assertThat(firstEvent.isDone).isFalse()
assertThat(teedEvent.isDone).isTrue()
@ -136,7 +151,90 @@ class ObservablesTests {
assertThat(firstEvent.get()).isEqualTo(0 to false)
assertThat(teedEvent.get()).isEqualTo(0 to true)
}
toBeClosed.close()
@Test
fun `new transaction open in observer when wrapped`() {
val database = createDatabase()
val source = PublishSubject.create<Int>()
val observableWithDbTx: Observable<Int> = source.wrapWithDatabaseTransaction()
val undelayedEvent = SettableFuture.create<Pair<Int, Boolean>>()
val delayedEventFromSecondObserver = SettableFuture.create<Pair<Int, UUID?>>()
val delayedEventFromThirdObserver = SettableFuture.create<Pair<Int, UUID?>>()
observableWithDbTx.first().subscribe { undelayedEvent.set(it to isInDatabaseTransaction()) }
fun observeSecondEvent(event: Int, future: SettableFuture<Pair<Int, UUID?>>) {
future.set(event to if (isInDatabaseTransaction()) StrandLocalTransactionManager.transactionId else null)
}
observableWithDbTx.skip(1).first().subscribe { observeSecondEvent(it, delayedEventFromSecondObserver) }
observableWithDbTx.skip(1).first().subscribe { observeSecondEvent(it, delayedEventFromThirdObserver) }
databaseTransaction(database) {
val commitDelayedSource = source.bufferUntilDatabaseCommit()
assertThat(source).isNotEqualTo(commitDelayedSource)
commitDelayedSource.onNext(0)
source.onNext(1)
assertThat(undelayedEvent.isDone).isTrue()
assertThat(undelayedEvent.get()).isEqualTo(1 to true)
assertThat(delayedEventFromSecondObserver.isDone).isFalse()
}
assertThat(delayedEventFromSecondObserver.isDone).isTrue()
assertThat(delayedEventFromSecondObserver.get().first).isEqualTo(0)
assertThat(delayedEventFromSecondObserver.get().second).isNotNull()
assertThat(delayedEventFromThirdObserver.get().first).isEqualTo(0)
assertThat(delayedEventFromThirdObserver.get().second).isNotNull()
// Test that the two observers of the second event were notified inside the same database transaction.
assertThat(delayedEventFromSecondObserver.get().second).isEqualTo(delayedEventFromThirdObserver.get().second)
}
@Test
fun `check wrapping in db tx doesn't eagerly subscribe`() {
val database = createDatabase()
val source = PublishSubject.create<Int>()
var subscribed = false
val event = SettableFuture.create<Int>()
val bufferedObservable: Observable<Int> = source.bufferUntilSubscribed().doOnSubscribe { subscribed = true }
val databaseWrappedObservable: Observable<Int> = bufferedObservable.wrapWithDatabaseTransaction(database)
source.onNext(0)
assertThat(subscribed).isFalse()
assertThat(event.isDone).isFalse()
databaseWrappedObservable.first().subscribe { event.set(it) }
source.onNext(1)
assertThat(event.isDone).isTrue()
assertThat(event.get()).isEqualTo(0)
}
@Test
fun `check wrapping in db tx unsubscribes`() {
val database = createDatabase()
val source = PublishSubject.create<Int>()
var unsubscribed = false
val bufferedObservable: Observable<Int> = source.bufferUntilSubscribed().doOnUnsubscribe { unsubscribed = true }
val databaseWrappedObservable: Observable<Int> = bufferedObservable.wrapWithDatabaseTransaction(database)
assertThat(unsubscribed).isFalse()
val subscription1 = databaseWrappedObservable.subscribe { }
val subscription2 = databaseWrappedObservable.subscribe { }
subscription1.unsubscribe()
assertThat(unsubscribed).isFalse()
subscription2.unsubscribe()
assertThat(unsubscribed).isTrue()
}
}