Replace kotlin Pair with DataFeed data class (#930)

* Replace kotlin Pair with DataFeed data class

* remove unintended changes

* minor fix

* address PR issues
This commit is contained in:
Patrick Kuo 2017-06-28 11:06:06 +01:00 committed by GitHub
parent 0aadc037ef
commit e02c37c06d
15 changed files with 90 additions and 54 deletions

View File

@ -4,13 +4,13 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
import net.corda.core.messaging.DataFeed
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import org.slf4j.Logger import org.slf4j.Logger
import rx.Observable
/** /**
* A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you * A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you
@ -215,10 +215,10 @@ abstract class FlowLogic<out T> {
* *
* @return Returns null if this flow has no progress tracker. * @return Returns null if this flow has no progress tracker.
*/ */
fun track(): Pair<String, Observable<String>>? { fun track(): DataFeed<String, String>? {
// TODO this is not threadsafe, needs an atomic get-step-and-subscribe // TODO this is not threadsafe, needs an atomic get-step-and-subscribe
return progressTracker?.let { return progressTracker?.let {
it.currentStep.label to it.changes.map { it.toString() } DataFeed(it.currentStep.label, it.changes.map { it.toString() })
} }
} }

View File

@ -32,7 +32,7 @@ data class StateMachineInfo(
val id: StateMachineRunId, val id: StateMachineRunId,
val flowLogicClassName: String, val flowLogicClassName: String,
val initiator: FlowInitiator, val initiator: FlowInitiator,
val progressTrackerStepAndUpdates: Pair<String, Observable<String>>? val progressTrackerStepAndUpdates: DataFeed<String, String>?
) { ) {
override fun toString(): String = "${javaClass.simpleName}($id, $flowLogicClassName)" override fun toString(): String = "${javaClass.simpleName}($id, $flowLogicClassName)"
} }
@ -52,9 +52,6 @@ sealed class StateMachineUpdate {
* RPC operations that the node exposes to clients using the Java client library. These can be called from * RPC operations that the node exposes to clients using the Java client library. These can be called from
* client apps and are implemented by the node in the [net.corda.node.internal.CordaRPCOpsImpl] class. * client apps and are implemented by the node in the [net.corda.node.internal.CordaRPCOpsImpl] class.
*/ */
// TODO: The use of Pairs throughout is unfriendly for Java interop.
interface CordaRPCOps : RPCOps { interface CordaRPCOps : RPCOps {
/** /**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed * Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
@ -63,10 +60,13 @@ interface CordaRPCOps : RPCOps {
override val protocolVersion: Int get() = nodeIdentity().platformVersion override val protocolVersion: Int get() = nodeIdentity().platformVersion
/** /**
* Returns a pair of currently in-progress state machine infos and an observable of future state machine adds/removes. * Returns a data feed of currently in-progress state machine infos and an observable of future state machine adds/removes.
*/ */
@RPCReturnsObservables @RPCReturnsObservables
fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("stateMachinesFeed()"))
fun stateMachinesAndUpdates() = stateMachinesFeed()
/** /**
* Returns a snapshot of vault states for a given query criteria (and optional order and paging specification) * Returns a snapshot of vault states for a given query criteria (and optional order and paging specification)
@ -147,31 +147,41 @@ interface CordaRPCOps : RPCOps {
// DOCEND VaultTrackAPIHelpers // DOCEND VaultTrackAPIHelpers
/** /**
* Returns a pair of head states in the vault and an observable of future updates to the vault. * Returns a data feed of head states in the vault and an observable of future updates to the vault.
*/ */
@RPCReturnsObservables @RPCReturnsObservables
// TODO: Remove this from the interface // TODO: Remove this from the interface
@Deprecated("This function will be removed in a future milestone", ReplaceWith("vaultTrackBy(QueryCriteria())")) @Deprecated("This function will be removed in a future milestone", ReplaceWith("vaultTrackBy(QueryCriteria())"))
fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> fun vaultAndUpdates(): DataFeed<List<StateAndRef<ContractState>>, Vault.Update>
/** /**
* Returns a pair of all recorded transactions and an observable of future recorded ones. * Returns a data feed of all recorded transactions and an observable of future recorded ones.
*/ */
@RPCReturnsObservables @RPCReturnsObservables
fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> fun verifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("verifiedTransactionFeed()"))
fun verifiedTransactions() = verifiedTransactionsFeed()
/** /**
* Returns a snapshot list of existing state machine id - recorded transaction hash mappings, and a stream of future * Returns a snapshot list of existing state machine id - recorded transaction hash mappings, and a stream of future
* such mappings as well. * such mappings as well.
*/ */
@RPCReturnsObservables @RPCReturnsObservables
fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("stateMachineRecordedTransactionMappingFeed()"))
fun stateMachineRecordedTransactionMapping() = stateMachineRecordedTransactionMappingFeed()
/** /**
* Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network. * Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network.
*/ */
@RPCReturnsObservables @RPCReturnsObservables
fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("networkMapFeed()"))
fun networkMapUpdates() = networkMapFeed()
/** /**
* Start the given flow with the given arguments. [logicType] must be annotated with [net.corda.core.flows.StartableByRPC]. * Start the given flow with the given arguments. [logicType] must be annotated with [net.corda.core.flows.StartableByRPC].
@ -382,3 +392,14 @@ inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startTrac
arg2: C, arg2: C,
arg3: D arg3: D
): FlowProgressHandle<T> = startTrackedFlowDynamic(R::class.java, arg0, arg1, arg2, arg3) ): FlowProgressHandle<T> = startTrackedFlowDynamic(R::class.java, arg0, arg1, arg2, arg3)
/**
* The Data feed contains a snapshot of the requested data and an [Observable] of future updates.
*/
@CordaSerializable
data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>) {
@Deprecated("This function will be removed in a future milestone", ReplaceWith("snapshot"))
val first: A get() = snapshot
@Deprecated("This function will be removed in a future milestone", ReplaceWith("updates"))
val second: Observable<B> get() = updates
}

View File

@ -3,6 +3,7 @@ package net.corda.core.node.services
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.Contract import net.corda.core.contracts.Contract
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.randomOrNull import net.corda.core.randomOrNull
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
@ -48,7 +49,7 @@ interface NetworkMapCache {
* Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the * Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the
* first subscriber is registered so as to avoid racing with early updates. * first subscriber is registered so as to avoid racing with early updates.
*/ */
fun track(): Pair<List<NodeInfo>, Observable<MapChange>> fun track(): DataFeed<List<NodeInfo>, MapChange>
/** Get the collection of nodes which advertise a specific service. */ /** Get the collection of nodes which advertise a specific service. */
fun getNodesWithService(serviceType: ServiceType): List<NodeInfo> { fun getNodesWithService(serviceType: ServiceType): List<NodeInfo> {

View File

@ -3,7 +3,6 @@ package net.corda.core.node.services
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.* import net.corda.core.contracts.*
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
@ -12,7 +11,9 @@ import net.corda.core.flows.FlowException
import net.corda.core.identity.AbstractParty import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
@ -189,7 +190,7 @@ interface VaultService {
*/ */
// TODO: Remove this from the interface // TODO: Remove this from the interface
@Deprecated("This function will be removed in a future milestone", ReplaceWith("trackBy(QueryCriteria())")) @Deprecated("This function will be removed in a future milestone", ReplaceWith("trackBy(QueryCriteria())"))
fun track(): Pair<Vault<ContractState>, Observable<Vault.Update>> fun track(): DataFeed<Vault<ContractState>, Vault.Update>
/** /**
* Return unconsumed [ContractState]s for a given set of [StateRef]s * Return unconsumed [ContractState]s for a given set of [StateRef]s

View File

@ -2,6 +2,7 @@ package net.corda.core.node.services
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import rx.Observable import rx.Observable
@ -14,5 +15,5 @@ data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRun
*/ */
interface StateMachineRecordedTransactionMappingStorage { interface StateMachineRecordedTransactionMappingStorage {
fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash)
fun track(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping>
} }

View File

@ -1,6 +1,7 @@
package net.corda.core.node.services package net.corda.core.node.services
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.DataFeed
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import rx.Observable import rx.Observable
@ -22,7 +23,7 @@ interface ReadOnlyTransactionStorage {
/** /**
* Returns all currently stored transactions and further fresh ones. * Returns all currently stored transactions and further fresh ones.
*/ */
fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> fun track(): DataFeed<List<SignedTransaction>, SignedTransaction>
} }
/** /**

View File

@ -14,7 +14,6 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault import net.corda.core.node.services.Vault
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort import net.corda.core.node.services.vault.Sort
@ -43,16 +42,16 @@ class CordaRPCOpsImpl(
private val smm: StateMachineManager, private val smm: StateMachineManager,
private val database: Database private val database: Database
) : CordaRPCOps { ) : CordaRPCOps {
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> { override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
return database.transaction { return database.transaction {
services.networkMapCache.track() services.networkMapCache.track()
} }
} }
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> { override fun vaultAndUpdates(): DataFeed<List<StateAndRef<ContractState>>, Vault.Update> {
return database.transaction { return database.transaction {
val (vault, updates) = services.vaultService.track() val (vault, updates) = services.vaultService.track()
Pair(vault.states.toList(), updates) DataFeed(vault.states.toList(), updates)
} }
} }
@ -75,23 +74,23 @@ class CordaRPCOpsImpl(
} }
} }
override fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> { override fun verifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return database.transaction { return database.transaction {
services.storageService.validatedTransactions.track() services.storageService.validatedTransactions.track()
} }
} }
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> { override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
return database.transaction { return database.transaction {
val (allStateMachines, changes) = smm.track() val (allStateMachines, changes) = smm.track()
Pair( DataFeed(
allStateMachines.map { stateMachineInfoFromFlowLogic(it.logic) }, allStateMachines.map { stateMachineInfoFromFlowLogic(it.logic) },
changes.map { stateMachineUpdateFromStateMachineChange(it) } changes.map { stateMachineUpdateFromStateMachineChange(it) }
) )
} }
} }
override fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> { override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return database.transaction { return database.transaction {
services.storageService.stateMachineRecordedTransactionMapping.track() services.storageService.stateMachineRecordedTransactionMapping.track()
} }

View File

@ -6,6 +6,7 @@ import com.google.common.util.concurrent.SettableFuture
import net.corda.core.bufferUntilSubscribed import net.corda.core.bufferUntilSubscribed
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.map import net.corda.core.map
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.node.services.DEFAULT_SESSION_ID
@ -71,9 +72,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = registeredNodes[identityKey] override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = registeredNodes[identityKey]
override fun track(): Pair<List<NodeInfo>, Observable<MapChange>> { override fun track(): DataFeed<List<NodeInfo>, MapChange> {
synchronized(_changed) { synchronized(_changed) {
return Pair(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }

View File

@ -4,6 +4,7 @@ import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.node.utilities.* import net.corda.node.utilities.*
@ -55,9 +56,9 @@ class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorag
} }
} }
override fun track(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> { override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
mutex.locked { mutex.locked {
return Pair( return DataFeed(
stateMachineTransactionMap.map { StateMachineTransactionMapping(it.value, it.key) }, stateMachineTransactionMap.map { StateMachineTransactionMapping(it.value, it.key) },
updates.bufferUntilSubscribed().wrapWithDatabaseTransaction() updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()
) )

View File

@ -3,6 +3,7 @@ package net.corda.node.services.persistence
import com.google.common.annotations.VisibleForTesting import com.google.common.annotations.VisibleForTesting
import net.corda.core.bufferUntilSubscribed import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.TransactionStorage import net.corda.core.node.services.TransactionStorage
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.node.utilities.* import net.corda.node.utilities.*
@ -61,9 +62,9 @@ class DBTransactionStorage : TransactionStorage {
val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized() val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction() override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> { override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
synchronized(txStorage) { synchronized(txStorage) {
return Pair(txStorage.values.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) return DataFeed(txStorage.values.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }

View File

@ -4,6 +4,7 @@ import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.StateMachineTransactionMapping
import rx.Observable import rx.Observable
@ -32,9 +33,9 @@ class InMemoryStateMachineRecordedTransactionMappingStorage : StateMachineRecord
} }
override fun track(): override fun track():
Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> { DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
mutex.locked { mutex.locked {
return Pair( return DataFeed(
stateMachineTransactionMap.flatMap { entry -> stateMachineTransactionMap.flatMap { entry ->
entry.value.map { entry.value.map {
StateMachineTransactionMapping(entry.key, it) StateMachineTransactionMapping(entry.key, it)

View File

@ -16,8 +16,12 @@ import com.google.common.util.concurrent.ListenableFuture
import io.requery.util.CloseableIterator import io.requery.util.CloseableIterator
import net.corda.core.* import net.corda.core.*
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.* import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.* import net.corda.core.serialization.*
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
@ -88,6 +92,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
} }
throw UnsupportedOperationException(message) throw UnsupportedOperationException(message)
} }
override fun read(kryo: Kryo, input: Input, type: Class<AutoCloseable>) = throw IllegalStateException("Should not reach here!") override fun read(kryo: Kryo, input: Input, type: Class<AutoCloseable>) = throw IllegalStateException("Should not reach here!")
} }
@ -226,9 +231,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and * Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and
* calls to [allStateMachines] * calls to [allStateMachines]
*/ */
fun track(): Pair<List<FlowStateMachineImpl<*>>, Observable<Change>> { fun track(): DataFeed<List<FlowStateMachineImpl<*>>, Change> {
return mutex.locked { return mutex.locked {
Pair(stateMachines.keys.toList(), changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) DataFeed(stateMachines.keys.toList(), changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }

View File

@ -19,6 +19,7 @@ import net.corda.core.crypto.containsAny
import net.corda.core.crypto.toBase58String import net.corda.core.crypto.toBase58String
import net.corda.core.identity.AbstractParty import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.messaging.DataFeed
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.node.services.* import net.corda.core.node.services.*
import net.corda.core.serialization.* import net.corda.core.serialization.*
@ -170,9 +171,9 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
override val updatesPublisher: PublishSubject<Vault.Update> override val updatesPublisher: PublishSubject<Vault.Update>
get() = mutex.locked { _updatesPublisher } get() = mutex.locked { _updatesPublisher }
override fun track(): Pair<Vault<ContractState>, Observable<Vault.Update>> { override fun track(): DataFeed<Vault<ContractState>, Vault.Update> {
return mutex.locked { return mutex.locked {
Pair(Vault(unconsumedStates<ContractState>()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) DataFeed(Vault(unconsumedStates<ContractState>()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }

View File

@ -14,6 +14,7 @@ import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.* import net.corda.core.node.services.*
@ -670,7 +671,7 @@ class TwoPartyTradeFlowTests {
class RecordingTransactionStorage(val database: Database, val delegate: TransactionStorage) : TransactionStorage { class RecordingTransactionStorage(val database: Database, val delegate: TransactionStorage) : TransactionStorage {
override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> { override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return database.transaction { return database.transaction {
delegate.track() delegate.track()
} }

View File

@ -5,6 +5,7 @@ import net.corda.core.contracts.Attachment
import net.corda.core.crypto.* import net.corda.core.crypto.*
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
@ -157,8 +158,8 @@ class MockStateMachineRecordedTransactionMappingStorage(
) : StateMachineRecordedTransactionMappingStorage by storage ) : StateMachineRecordedTransactionMappingStorage by storage
open class MockTransactionStorage : TransactionStorage { open class MockTransactionStorage : TransactionStorage {
override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> { override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return Pair(txns.values.toList(), _updatesPublisher) return DataFeed(txns.values.toList(), _updatesPublisher)
} }
private val txns = HashMap<SecureHash, SignedTransaction>() private val txns = HashMap<SecureHash, SignedTransaction>()