Removed deprecated methods in DataFeed and deprecated feed-based methods in CordaRPCOps

This commit is contained in:
Shams Asari
2017-07-31 14:23:17 +01:00
parent 6490638ee4
commit 4a600121cc
15 changed files with 66 additions and 87 deletions

View File

@ -11,9 +11,9 @@ import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.Vault
import net.corda.core.utilities.seconds
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import rx.Observable
import rx.subjects.PublishSubject
@ -63,7 +63,7 @@ class NodeMonitorModel {
val connection = client.start(username, password)
val proxy = connection.proxy
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesAndUpdates()
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
// Extract the flow tracking stream
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine ->
@ -86,19 +86,19 @@ class NodeMonitorModel {
// Vault updates
val (vault, vaultUpdates) = proxy.vaultAndUpdates()
val initialVaultUpdate = Vault.Update<ContractState>(setOf(), vault.toSet())
val initialVaultUpdate = Vault.Update(setOf(), vault.toSet())
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject)
// Transactions
val (transactions, newTransactions) = proxy.verifiedTransactions()
val (transactions, newTransactions) = proxy.verifiedTransactionsFeed()
newTransactions.startWith(transactions).subscribe(transactionsSubject)
// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMapping()
val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject)
// Parties on network
val (parties, futurePartyUpdate) = proxy.networkMapUpdates()
val (parties, futurePartyUpdate) = proxy.networkMapFeed()
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject)
proxyObservable.set(proxy)

View File

@ -128,10 +128,9 @@ class CordaRPCClientTest : NodeBasedTest() {
fun `flow initiator via RPC`() {
login(rpcUser.username, rpcUser.password)
val proxy = connection!!.proxy
val smUpdates = proxy.stateMachinesAndUpdates()
var countRpcFlows = 0
var countShellFlows = 0
smUpdates.second.subscribe {
proxy.stateMachinesFeed().updates.subscribe {
if (it is StateMachineUpdate.Added) {
val initiator = it.stateMachineInfo.initiator
if (initiator is FlowInitiator.RPC)

View File

@ -118,7 +118,7 @@ class StandaloneCordaRPClientTest {
@Test
fun `test state machines`() {
val (stateMachines, updates) = rpcProxy.stateMachinesAndUpdates()
val (stateMachines, updates) = rpcProxy.stateMachinesFeed()
assertEquals(0, stateMachines.size)
val updateCount = AtomicInteger(0)

View File

@ -52,10 +52,7 @@ sealed class StateMachineUpdate {
@CordaSerializable
data class StateMachineTransactionMapping(val stateMachineRunId: StateMachineRunId, val transactionId: SecureHash)
/**
* RPC operations that the node exposes to clients using the Java client library. These can be called from
* client apps and are implemented by the node in the [net.corda.node.internal.CordaRPCOpsImpl] class.
*/
/** RPC operations that the node exposes to clients. */
interface CordaRPCOps : RPCOps {
/**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
@ -69,9 +66,6 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("stateMachinesFeed()"))
fun stateMachinesAndUpdates() = stateMachinesFeed()
/**
* Returns a snapshot of vault states for a given query criteria (and optional order and paging specification)
*
@ -171,10 +165,6 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun verifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("verifiedTransactionFeed()"))
fun verifiedTransactions() = verifiedTransactionsFeed()
/**
* Returns a snapshot list of existing state machine id - recorded transaction hash mappings, and a stream of future
* such mappings as well.
@ -182,18 +172,12 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("stateMachineRecordedTransactionMappingFeed()"))
fun stateMachineRecordedTransactionMapping() = stateMachineRecordedTransactionMappingFeed()
/**
* Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network.
*/
@RPCReturnsObservables
fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("networkMapFeed()"))
fun networkMapUpdates() = networkMapFeed()
/**
* Start the given flow with the given arguments. [logicType] must be annotated with [net.corda.core.flows.StartableByRPC].
*/
@ -430,13 +414,4 @@ inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startTrac
* The Data feed contains a snapshot of the requested data and an [Observable] of future updates.
*/
@CordaSerializable
data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>) {
@Deprecated("This function will be removed in a future milestone", ReplaceWith("snapshot"))
val first: A get() = snapshot
@Deprecated("This function will be removed in a future milestone", ReplaceWith("updates"))
val second: Observable<B> get() = updates
@Deprecated("This function will be removed in a future milestone", ReplaceWith("snapshot"))
val current: A get() = snapshot
@Deprecated("This function will be removed in a future milestone", ReplaceWith("updates"))
val future: Observable<B> get() = updates
}
data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>)

View File

@ -60,6 +60,18 @@ UNRELEASED
* ``Cordformation`` adds a ``corda`` and ``cordaRuntime`` configuration to projects which cordapp developers should
use to exclude core Corda JARs from being built into Cordapp fat JARs.
.. Milestone 15:
* Following deprecated methods have been removed:
* In ``DataFeed``
* ``first`` and ``current``, replaced by ``snapshot``
* ``second`` and ``future``, replaced by ``updates``
* In ``CordaRPCOps``
* ``stateMachinesAndUpdates``, replaced by ``stateMachinesFeed``
* ``verifiedTransactions``, replaced by ``verifiedTransactionsFeed``
* ``stateMachineRecordedTransactionMapping``, replaced by ``stateMachineRecordedTransactionMappingFeed``
* ``networkMapUpdates``, replaced by ``networkMapFeed``
Milestone 13
------------

View File

@ -9,17 +9,17 @@ import net.corda.core.messaging.startFlow
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.serialization.SerializationCustomization
import net.corda.core.transactions.SignedTransaction
import net.corda.testing.ALICE
import net.corda.testing.DUMMY_NOTARY
import net.corda.core.utilities.OpaqueBytes
import net.corda.flows.CashExitFlow
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.driver.driver
import org.graphstream.graph.Edge
import org.graphstream.graph.Node
@ -63,7 +63,7 @@ fun main(args: Array<String>) {
// END 2
// START 3
val (transactions: List<SignedTransaction>, futureTransactions: Observable<SignedTransaction>) = proxy.verifiedTransactions()
val (transactions: List<SignedTransaction>, futureTransactions: Observable<SignedTransaction>) = proxy.verifiedTransactionsFeed()
// END 3
// START 4
@ -71,8 +71,8 @@ fun main(args: Array<String>) {
PrintOrVisualise.Print -> {
futureTransactions.startWith(transactions).subscribe { transaction ->
println("NODE ${transaction.id}")
transaction.tx.inputs.forEach { input ->
println("EDGE ${input.txhash} ${transaction.id}")
transaction.tx.inputs.forEach { (txhash) ->
println("EDGE $txhash ${transaction.id}")
}
}
}
@ -111,7 +111,7 @@ fun generateTransactions(proxy: CordaRPCOps) {
sum + (state.state.data as Cash.State).amount.quantity
}
val issueRef = OpaqueBytes.of(0)
val (parties, partyUpdates) = proxy.networkMapUpdates()
val (parties, partyUpdates) = proxy.networkMapFeed()
partyUpdates.notUsed()
val notary = parties.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
val me = proxy.nodeIdentity().legalIdentity

View File

@ -1,27 +1,23 @@
package net.corda.node.services
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.contracts.Amount
import net.corda.core.contracts.POUNDS
import net.corda.core.identity.Party
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.OpaqueBytes
import net.corda.testing.ALICE
import net.corda.testing.DUMMY_NOTARY
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.nodeapi.User
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.*
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.node.DriverBasedTest
import net.corda.testing.replicate
import org.junit.Test
import rx.Observable
import java.util.*
@ -65,7 +61,7 @@ class DistributedServiceTests : DriverBasedTest() {
aliceProxy = connectRpc(alice)
val rpcClientsToNotaries = notaries.map(::connectRpc)
notaryStateMachines = Observable.from(rpcClientsToNotaries.map { proxy ->
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
proxy.stateMachinesFeed().updates.map { Pair(proxy.nodeIdentity(), it) }
}).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed()
runTest()
@ -86,8 +82,7 @@ class DistributedServiceTests : DriverBasedTest() {
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(50) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
expect(match = { it.second is StateMachineUpdate.Added }) { (notary, update) ->
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _, number -> number?.plus(1) ?: 1 }
}
@ -125,8 +120,7 @@ class DistributedServiceTests : DriverBasedTest() {
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(30) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
expect(match = { it.second is StateMachineUpdate.Added }) { (notary, update) ->
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _, number -> number?.plus(1) ?: 1 }
}

View File

@ -114,7 +114,7 @@ class CordaRPCOpsImpl(
return FlowProgressHandleImpl(
id = stateMachine.id,
returnValue = stateMachine.resultFuture,
progress = stateMachine.logic.track()?.second ?: Observable.empty()
progress = stateMachine.logic.track()?.updates ?: Observable.empty()
)
}

View File

@ -308,7 +308,7 @@ object InteractiveShell {
@JvmStatic
fun runStateMachinesView(out: RenderPrintWriter): Any? {
val proxy = node.rpcOps
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesAndUpdates()
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
val subscriber = FlowWatchPrintingSubscriber(out)
stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber)

View File

@ -12,8 +12,8 @@ import net.corda.core.messaging.*
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.node.services.unconsumedStates
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.internal.CordaRPCOpsImpl
@ -69,10 +69,10 @@ class CordaRPCOpsImplTest {
))))
aliceNode.database.transaction {
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
transactions = rpc.verifiedTransactions().second
vaultUpdates = rpc.vaultAndUpdates().second
vaultTrackCash = rpc.vaultTrackBy<Cash.State>().future
stateMachineUpdates = rpc.stateMachinesFeed().updates
transactions = rpc.verifiedTransactionsFeed().updates
vaultUpdates = rpc.vaultAndUpdates().updates
vaultTrackCash = rpc.vaultTrackBy<Cash.State>().updates
}
}
@ -110,7 +110,7 @@ class CordaRPCOpsImplTest {
)
}
val tx = result.returnValue.getOrThrow()
result.returnValue.getOrThrow()
val expectedState = Cash.State(Amount(quantity,
Issued(aliceNode.info.legalIdentity.ref(ref), GBP)),
recipient)
@ -139,7 +139,7 @@ class CordaRPCOpsImplTest {
fun `issue and move`() {
val anonymous = false
val result = rpc.startFlow(::CashIssueFlow,
Amount(100, USD),
100.DOLLARS,
OpaqueBytes(ByteArray(1, { 1 })),
aliceNode.info.legalIdentity,
notaryNode.info.notaryIdentity,
@ -148,13 +148,13 @@ class CordaRPCOpsImplTest {
mockNet.runNetwork()
rpc.startFlow(::CashPaymentFlow, Amount(100, USD), aliceNode.info.legalIdentity, anonymous)
rpc.startFlow(::CashPaymentFlow, 100.DOLLARS, aliceNode.info.legalIdentity, anonymous)
mockNet.runNetwork()
var issueSmId: StateMachineRunId? = null
var moveSmId: StateMachineRunId? = null
stateMachineUpdates.expectEvents() {
stateMachineUpdates.expectEvents {
sequence(
// ISSUE
expect { add: StateMachineUpdate.Added ->
@ -202,14 +202,14 @@ class CordaRPCOpsImplTest {
vaultUpdates.expectEvents {
sequence(
// ISSUE
expect { update ->
require(update.consumed.isEmpty()) { update.consumed.size }
require(update.produced.size == 1) { update.produced.size }
expect { (consumed, produced) ->
require(consumed.isEmpty()) { consumed.size }
require(produced.size == 1) { produced.size }
},
// MOVE
expect { update ->
require(update.consumed.size == 1) { update.consumed.size }
require(update.produced.size == 1) { update.produced.size }
expect { (consumed, produced) ->
require(consumed.size == 1) { consumed.size }
require(produced.size == 1) { produced.size }
}
)
}
@ -217,14 +217,14 @@ class CordaRPCOpsImplTest {
vaultTrackCash.expectEvents {
sequence(
// ISSUE
expect { update ->
require(update.consumed.isEmpty()) { update.consumed.size }
require(update.produced.size == 1) { update.produced.size }
expect { (consumed, produced) ->
require(consumed.isEmpty()) { consumed.size }
require(produced.size == 1) { produced.size }
},
// MOVE
expect { update ->
require(update.consumed.size == 1) { update.consumed.size }
require(update.produced.size == 1) { update.produced.size }
expect { (consumed, produced) ->
require(consumed.size == 1) { consumed.size }
require(produced.size == 1) { produced.size }
}
)
}

View File

@ -103,9 +103,8 @@ class ScheduledFlowTests {
@Test
fun `create and run scheduled flow then wait for result`() {
val stateMachines = nodeA.smm.track()
var countScheduledFlows = 0
stateMachines.second.subscribe {
nodeA.smm.track().updates.subscribe {
if (it is StateMachineManager.Change.Add) {
val initiator = it.logic.stateMachine.flowInitiator
if (initiator is FlowInitiator.Scheduled)

View File

@ -121,7 +121,7 @@ class AttachmentDemoFlow(val otherSide: Party, val hash: SecureHash.SHA256) : Fl
fun recipient(rpc: CordaRPCOps) {
println("Waiting to receive transaction ...")
val stx = rpc.verifiedTransactions().second.toBlocking().first()
val stx = rpc.verifiedTransactionsFeed().updates.toBlocking().first()
val wtx = stx.tx
if (wtx.attachments.isNotEmpty()) {
if (wtx.outputs.isNotEmpty()) {

View File

@ -81,7 +81,7 @@ class IRSDemoTest : IntegrationTestCategory {
fun getFixingDateObservable(config: FullNodeConfiguration): Observable<LocalDate?> {
val client = CordaRPCClient(config.rpcAddress!!, initialiseSerialization = false)
val proxy = client.start("user", "password").proxy
val vaultUpdates = proxy.vaultAndUpdates().second
val vaultUpdates = proxy.vaultAndUpdates().updates
return vaultUpdates.map { update ->
val irsStates = update.produced.map { it.state.data }.filterIsInstance<InterestRateSwap.State>()

View File

@ -27,14 +27,14 @@ fun main(args: Array<String>) {
/** Interface for using the notary demo API from a client. */
private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
private val notary by lazy {
val (parties, partyUpdates) = rpc.networkMapUpdates()
val (parties, partyUpdates) = rpc.networkMapFeed()
partyUpdates.notUsed()
val id = parties.stream().filter { it.advertisedServices.any { it.info.type.isNotary() } }.map { it.notaryIdentity }.distinct().asSequence().singleOrNull()
checkNotNull(id) { "No unique notary identity, try cleaning the node directories." }
}
private val counterpartyNode by lazy {
val (parties, partyUpdates) = rpc.networkMapUpdates()
val (parties, partyUpdates) = rpc.networkMapFeed()
partyUpdates.notUsed()
parties.single { it.legalIdentity.name == BOB.name }
}

View File

@ -181,7 +181,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
log.info("Getting node info of ${connection.remoteNode.hostname}")
val info = connection.info
log.info("Got node info of ${connection.remoteNode.hostname}: $info!")
val (otherInfo, infoUpdates) = connection.proxy.networkMapUpdates()
val (otherInfo, infoUpdates) = connection.proxy.networkMapFeed()
infoUpdates.notUsed()
val pubKeysString = otherInfo.map {
" ${it.legalIdentity.name}: ${it.legalIdentity.owningKey.toBase58String()}"