From e02c37c06da65368adaf2cdda554460351f1576a Mon Sep 17 00:00:00 2001 From: Patrick Kuo Date: Wed, 28 Jun 2017 11:06:06 +0100 Subject: [PATCH] Replace kotlin Pair with `DataFeed` data class (#930) * Replace kotlin Pair with DataFeed data class * remove unintended changes * minor fix * address PR issues --- .../kotlin/net/corda/core/flows/FlowLogic.kt | 12 ++--- .../net/corda/core/messaging/CordaRPCOps.kt | 47 ++++++++++++++----- .../core/node/services/NetworkMapCache.kt | 3 +- .../net/corda/core/node/services/Services.kt | 13 ++--- ...achineRecordedTransactionMappingStorage.kt | 3 +- .../core/node/services/TransactionStorage.kt | 3 +- .../corda/node/internal/CordaRPCOpsImpl.kt | 15 +++--- .../network/InMemoryNetworkMapCache.kt | 5 +- .../DBTransactionMappingStorage.kt | 5 +- .../persistence/DBTransactionStorage.kt | 5 +- ...achineRecordedTransactionMappingStorage.kt | 5 +- .../statemachine/StateMachineManager.kt | 15 ++++-- .../node/services/vault/NodeVaultService.kt | 5 +- .../node/messaging/TwoPartyTradeFlowTests.kt | 3 +- .../net/corda/testing/node/MockServices.kt | 5 +- 15 files changed, 90 insertions(+), 54 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index a73cda8427..4583d14a62 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -4,13 +4,13 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.SecureHash import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine +import net.corda.core.messaging.DataFeed import net.corda.core.node.ServiceHub import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.debug import org.slf4j.Logger -import rx.Observable /** * A sub-class of [FlowLogic] implements a flow using direct, straight line blocking code. Thus you @@ -180,7 +180,7 @@ abstract class FlowLogic { * @param extraAuditData in the audit log for this permission check these extra key value pairs will be recorded. */ @Throws(FlowException::class) - fun checkFlowPermission(permissionName: String, extraAuditData: Map) = stateMachine.checkFlowPermission(permissionName, extraAuditData) + fun checkFlowPermission(permissionName: String, extraAuditData: Map) = stateMachine.checkFlowPermission(permissionName, extraAuditData) /** @@ -189,7 +189,7 @@ abstract class FlowLogic { * @param comment a general human readable summary of the event. * @param extraAuditData in the audit log for this permission check these extra key value pairs will be recorded. */ - fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map) = stateMachine.recordAuditEvent(eventType, comment, extraAuditData) + fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map) = stateMachine.recordAuditEvent(eventType, comment, extraAuditData) /** * Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something @@ -215,10 +215,10 @@ abstract class FlowLogic { * * @return Returns null if this flow has no progress tracker. */ - fun track(): Pair>? { + fun track(): DataFeed? { // TODO this is not threadsafe, needs an atomic get-step-and-subscribe return progressTracker?.let { - it.currentStep.label to it.changes.map { it.toString() } + DataFeed(it.currentStep.label, it.changes.map { it.toString() }) } } @@ -230,7 +230,7 @@ abstract class FlowLogic { @Suspendable fun waitForLedgerCommit(hash: SecureHash): SignedTransaction = stateMachine.waitForLedgerCommit(hash, this) - //////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////// private var _stateMachine: FlowStateMachine<*>? = null /** diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index bfe9910de8..807519cb96 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -32,7 +32,7 @@ data class StateMachineInfo( val id: StateMachineRunId, val flowLogicClassName: String, val initiator: FlowInitiator, - val progressTrackerStepAndUpdates: Pair>? + val progressTrackerStepAndUpdates: DataFeed? ) { override fun toString(): String = "${javaClass.simpleName}($id, $flowLogicClassName)" } @@ -52,9 +52,6 @@ sealed class StateMachineUpdate { * RPC operations that the node exposes to clients using the Java client library. These can be called from * client apps and are implemented by the node in the [net.corda.node.internal.CordaRPCOpsImpl] class. */ - -// TODO: The use of Pairs throughout is unfriendly for Java interop. - interface CordaRPCOps : RPCOps { /** * Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed @@ -63,10 +60,13 @@ interface CordaRPCOps : RPCOps { override val protocolVersion: Int get() = nodeIdentity().platformVersion /** - * Returns a pair of currently in-progress state machine infos and an observable of future state machine adds/removes. + * Returns a data feed of currently in-progress state machine infos and an observable of future state machine adds/removes. */ @RPCReturnsObservables - fun stateMachinesAndUpdates(): Pair, Observable> + fun stateMachinesFeed(): DataFeed, StateMachineUpdate> + + @Deprecated("This function will be removed in a future milestone", ReplaceWith("stateMachinesFeed()")) + fun stateMachinesAndUpdates() = stateMachinesFeed() /** * Returns a snapshot of vault states for a given query criteria (and optional order and paging specification) @@ -119,7 +119,7 @@ interface CordaRPCOps : RPCOps { * * Notes: the snapshot part of the query adheres to the same behaviour as the [queryBy] function. * the [QueryCriteria] applies to both snapshot and deltas (streaming updates). - */ + */ // DOCSTART VaultTrackByAPI @RPCReturnsObservables fun vaultTrackBy(criteria: QueryCriteria, @@ -147,31 +147,41 @@ interface CordaRPCOps : RPCOps { // DOCEND VaultTrackAPIHelpers /** - * Returns a pair of head states in the vault and an observable of future updates to the vault. + * Returns a data feed of head states in the vault and an observable of future updates to the vault. */ @RPCReturnsObservables // TODO: Remove this from the interface @Deprecated("This function will be removed in a future milestone", ReplaceWith("vaultTrackBy(QueryCriteria())")) - fun vaultAndUpdates(): Pair>, Observable> + fun vaultAndUpdates(): DataFeed>, Vault.Update> /** - * Returns a pair of all recorded transactions and an observable of future recorded ones. + * Returns a data feed of all recorded transactions and an observable of future recorded ones. */ @RPCReturnsObservables - fun verifiedTransactions(): Pair, Observable> + fun verifiedTransactionsFeed(): DataFeed, SignedTransaction> + + @Deprecated("This function will be removed in a future milestone", ReplaceWith("verifiedTransactionFeed()")) + fun verifiedTransactions() = verifiedTransactionsFeed() + /** * Returns a snapshot list of existing state machine id - recorded transaction hash mappings, and a stream of future * such mappings as well. */ @RPCReturnsObservables - fun stateMachineRecordedTransactionMapping(): Pair, Observable> + fun stateMachineRecordedTransactionMappingFeed(): DataFeed, StateMachineTransactionMapping> + + @Deprecated("This function will be removed in a future milestone", ReplaceWith("stateMachineRecordedTransactionMappingFeed()")) + fun stateMachineRecordedTransactionMapping() = stateMachineRecordedTransactionMappingFeed() /** * Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network. */ @RPCReturnsObservables - fun networkMapUpdates(): Pair, Observable> + fun networkMapFeed(): DataFeed, NetworkMapCache.MapChange> + + @Deprecated("This function will be removed in a future milestone", ReplaceWith("networkMapFeed()")) + fun networkMapUpdates() = networkMapFeed() /** * Start the given flow with the given arguments. [logicType] must be annotated with [net.corda.core.flows.StartableByRPC]. @@ -382,3 +392,14 @@ inline fun > CordaRPCOps.startTrac arg2: C, arg3: D ): FlowProgressHandle = startTrackedFlowDynamic(R::class.java, arg0, arg1, arg2, arg3) + +/** + * The Data feed contains a snapshot of the requested data and an [Observable] of future updates. + */ +@CordaSerializable +data class DataFeed(val snapshot: A, val updates: Observable) { + @Deprecated("This function will be removed in a future milestone", ReplaceWith("snapshot")) + val first: A get() = snapshot + @Deprecated("This function will be removed in a future milestone", ReplaceWith("updates")) + val second: Observable get() = updates +} diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 027383031d..7713bc35bb 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -3,6 +3,7 @@ package net.corda.core.node.services import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.Contract import net.corda.core.identity.Party +import net.corda.core.messaging.DataFeed import net.corda.core.node.NodeInfo import net.corda.core.randomOrNull import net.corda.core.serialization.CordaSerializable @@ -48,7 +49,7 @@ interface NetworkMapCache { * Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the * first subscriber is registered so as to avoid racing with early updates. */ - fun track(): Pair, Observable> + fun track(): DataFeed, MapChange> /** Get the collection of nodes which advertise a specific service. */ fun getNodesWithService(serviceType: ServiceType): List { diff --git a/core/src/main/kotlin/net/corda/core/node/services/Services.kt b/core/src/main/kotlin/net/corda/core/node/services/Services.kt index dfb7c4c5e1..db14d69bfd 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/Services.kt @@ -3,7 +3,6 @@ package net.corda.core.node.services import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.* -import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.SecureHash @@ -12,7 +11,9 @@ import net.corda.core.flows.FlowException import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate +import net.corda.core.messaging.DataFeed import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.Sort import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.OpaqueBytes @@ -71,9 +72,9 @@ class Vault(val states: Iterable>) { /** Checks whether the update contains a state of the specified type and state status */ fun containsType(clazz: Class, status: StateStatus) = - when(status) { + when (status) { StateStatus.UNCONSUMED -> produced.any { clazz.isAssignableFrom(it.state.data.javaClass) } - StateStatus.CONSUMED -> consumed.any { clazz.isAssignableFrom(it.state.data.javaClass) } + StateStatus.CONSUMED -> consumed.any { clazz.isAssignableFrom(it.state.data.javaClass) } else -> consumed.any { clazz.isAssignableFrom(it.state.data.javaClass) } || produced.any { clazz.isAssignableFrom(it.state.data.javaClass) } } @@ -142,7 +143,7 @@ class Vault(val states: Iterable>) { val lockUpdateTime: Instant?) @CordaSerializable - data class PageAndUpdates (val current: Vault.Page, val future: Observable) + data class PageAndUpdates(val current: Vault.Page, val future: Observable) } /** @@ -189,7 +190,7 @@ interface VaultService { */ // TODO: Remove this from the interface @Deprecated("This function will be removed in a future milestone", ReplaceWith("trackBy(QueryCriteria())")) - fun track(): Pair, Observable> + fun track(): DataFeed, Vault.Update> /** * Return unconsumed [ContractState]s for a given set of [StateRef]s @@ -274,7 +275,7 @@ interface VaultService { * Optionally may specify whether to include [StateRef] that have been marked as soft locked (default is true) */ // TODO: Remove this from the interface - @Deprecated("This function will be removed in a future milestone", ReplaceWith("queryBy(QueryCriteria())")) + @Deprecated("This function will be removed in a future milestone", ReplaceWith("queryBy(QueryCriteria())")) fun states(clazzes: Set>, statuses: EnumSet, includeSoftLockedStates: Boolean = true): Iterable> // DOCEND VaultStatesQuery diff --git a/core/src/main/kotlin/net/corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt b/core/src/main/kotlin/net/corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt index 66171bd2ed..98ca2272c2 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/StateMachineRecordedTransactionMappingStorage.kt @@ -2,6 +2,7 @@ package net.corda.core.node.services import net.corda.core.crypto.SecureHash import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.DataFeed import net.corda.core.serialization.CordaSerializable import rx.Observable @@ -14,5 +15,5 @@ data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRun */ interface StateMachineRecordedTransactionMappingStorage { fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) - fun track(): Pair, Observable> + fun track(): DataFeed, StateMachineTransactionMapping> } diff --git a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt index a6788cc1d2..380ba12365 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt @@ -1,6 +1,7 @@ package net.corda.core.node.services import net.corda.core.crypto.SecureHash +import net.corda.core.messaging.DataFeed import net.corda.core.transactions.SignedTransaction import rx.Observable @@ -22,7 +23,7 @@ interface ReadOnlyTransactionStorage { /** * Returns all currently stored transactions and further fresh ones. */ - fun track(): Pair, Observable> + fun track(): DataFeed, SignedTransaction> } /** diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 479b58c8db..881ae4644d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -14,7 +14,6 @@ import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault -import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.Sort @@ -43,16 +42,16 @@ class CordaRPCOpsImpl( private val smm: StateMachineManager, private val database: Database ) : CordaRPCOps { - override fun networkMapUpdates(): Pair, Observable> { + override fun networkMapFeed(): DataFeed, NetworkMapCache.MapChange> { return database.transaction { services.networkMapCache.track() } } - override fun vaultAndUpdates(): Pair>, Observable> { + override fun vaultAndUpdates(): DataFeed>, Vault.Update> { return database.transaction { val (vault, updates) = services.vaultService.track() - Pair(vault.states.toList(), updates) + DataFeed(vault.states.toList(), updates) } } @@ -75,23 +74,23 @@ class CordaRPCOpsImpl( } } - override fun verifiedTransactions(): Pair, Observable> { + override fun verifiedTransactionsFeed(): DataFeed, SignedTransaction> { return database.transaction { services.storageService.validatedTransactions.track() } } - override fun stateMachinesAndUpdates(): Pair, Observable> { + override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { return database.transaction { val (allStateMachines, changes) = smm.track() - Pair( + DataFeed( allStateMachines.map { stateMachineInfoFromFlowLogic(it.logic) }, changes.map { stateMachineUpdateFromStateMachineChange(it) } ) } } - override fun stateMachineRecordedTransactionMapping(): Pair, Observable> { + override fun stateMachineRecordedTransactionMappingFeed(): DataFeed, StateMachineTransactionMapping> { return database.transaction { services.storageService.stateMachineRecordedTransactionMapping.track() } diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt index f25c210c59..ab5bbeee03 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt @@ -6,6 +6,7 @@ import com.google.common.util.concurrent.SettableFuture import net.corda.core.bufferUntilSubscribed import net.corda.core.identity.Party import net.corda.core.map +import net.corda.core.messaging.DataFeed import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NodeInfo import net.corda.core.node.services.DEFAULT_SESSION_ID @@ -71,9 +72,9 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach override fun getNodeByLegalIdentityKey(identityKey: PublicKey): NodeInfo? = registeredNodes[identityKey] - override fun track(): Pair, Observable> { + override fun track(): DataFeed, MapChange> { synchronized(_changed) { - return Pair(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + return DataFeed(partyNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt index 69cdf43e5c..bbf58c3524 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionMappingStorage.kt @@ -4,6 +4,7 @@ import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.SecureHash import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.DataFeed import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.node.utilities.* @@ -55,9 +56,9 @@ class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorag } } - override fun track(): Pair, Observable> { + override fun track(): DataFeed, StateMachineTransactionMapping> { mutex.locked { - return Pair( + return DataFeed( stateMachineTransactionMap.map { StateMachineTransactionMapping(it.value, it.key) }, updates.bufferUntilSubscribed().wrapWithDatabaseTransaction() ) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 04e42dcb13..51139177f1 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -3,6 +3,7 @@ package net.corda.node.services.persistence import com.google.common.annotations.VisibleForTesting import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.SecureHash +import net.corda.core.messaging.DataFeed import net.corda.core.node.services.TransactionStorage import net.corda.core.transactions.SignedTransaction import net.corda.node.utilities.* @@ -61,9 +62,9 @@ class DBTransactionStorage : TransactionStorage { val updatesPublisher = PublishSubject.create().toSerialized() override val updates: Observable = updatesPublisher.wrapWithDatabaseTransaction() - override fun track(): Pair, Observable> { + override fun track(): DataFeed, SignedTransaction> { synchronized(txStorage) { - return Pair(txStorage.values.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + return DataFeed(txStorage.values.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt index 301fde426f..f0aaa50e86 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/InMemoryStateMachineRecordedTransactionMappingStorage.kt @@ -4,6 +4,7 @@ import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.SecureHash import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.DataFeed import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage import net.corda.core.node.services.StateMachineTransactionMapping import rx.Observable @@ -32,9 +33,9 @@ class InMemoryStateMachineRecordedTransactionMappingStorage : StateMachineRecord } override fun track(): - Pair, Observable> { + DataFeed, StateMachineTransactionMapping> { mutex.locked { - return Pair( + return DataFeed( stateMachineTransactionMap.flatMap { entry -> entry.value.map { StateMachineTransactionMapping(entry.key, it) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 595ac8dc47..eac032e0e3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -16,8 +16,12 @@ import com.google.common.util.concurrent.ListenableFuture import io.requery.util.CloseableIterator import net.corda.core.* import net.corda.core.crypto.SecureHash -import net.corda.core.flows.* +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowInitiator +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party +import net.corda.core.messaging.DataFeed import net.corda.core.serialization.* import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor @@ -88,6 +92,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } throw UnsupportedOperationException(message) } + override fun read(kryo: Kryo, input: Input, type: Class) = throw IllegalStateException("Should not reach here!") } @@ -107,8 +112,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, sealed class Change { abstract val logic: FlowLogic<*> - data class Add(override val logic: FlowLogic<*>): Change() - data class Removed(override val logic: FlowLogic<*>, val result: ErrorOr<*>): Change() + data class Add(override val logic: FlowLogic<*>) : Change() + data class Removed(override val logic: FlowLogic<*>, val result: ErrorOr<*>) : Change() } // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines @@ -226,9 +231,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, * Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and * calls to [allStateMachines] */ - fun track(): Pair>, Observable> { + fun track(): DataFeed>, Change> { return mutex.locked { - Pair(stateMachines.keys.toList(), changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + DataFeed(stateMachines.keys.toList(), changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 0f014302c1..159f784bdb 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -19,6 +19,7 @@ import net.corda.core.crypto.containsAny import net.corda.core.crypto.toBase58String import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party +import net.corda.core.messaging.DataFeed import net.corda.core.node.ServiceHub import net.corda.core.node.services.* import net.corda.core.serialization.* @@ -170,9 +171,9 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P override val updatesPublisher: PublishSubject get() = mutex.locked { _updatesPublisher } - override fun track(): Pair, Observable> { + override fun track(): DataFeed, Vault.Update> { return mutex.locked { - Pair(Vault(unconsumedStates()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) + DataFeed(Vault(unconsumedStates()), _updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction()) } } diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 493fe77ce1..c8d8876680 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -14,6 +14,7 @@ import net.corda.core.identity.AbstractParty import net.corda.core.identity.AnonymousParty import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine +import net.corda.core.messaging.DataFeed import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NodeInfo import net.corda.core.node.services.* @@ -670,7 +671,7 @@ class TwoPartyTradeFlowTests { class RecordingTransactionStorage(val database: Database, val delegate: TransactionStorage) : TransactionStorage { - override fun track(): Pair, Observable> { + override fun track(): DataFeed, SignedTransaction> { return database.transaction { delegate.track() } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index d7366d7663..c3dd7e6e1f 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -5,6 +5,7 @@ import net.corda.core.contracts.Attachment import net.corda.core.crypto.* import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.PartyAndCertificate +import net.corda.core.messaging.DataFeed import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub @@ -157,8 +158,8 @@ class MockStateMachineRecordedTransactionMappingStorage( ) : StateMachineRecordedTransactionMappingStorage by storage open class MockTransactionStorage : TransactionStorage { - override fun track(): Pair, Observable> { - return Pair(txns.values.toList(), _updatesPublisher) + override fun track(): DataFeed, SignedTransaction> { + return DataFeed(txns.values.toList(), _updatesPublisher) } private val txns = HashMap()