Introducing Observable.toFuture() extension method

This commit is contained in:
Shams Asari
2017-01-17 11:40:33 +00:00
parent 429fbb3b97
commit c4e3b258c7
10 changed files with 85 additions and 68 deletions

View File

@ -363,8 +363,7 @@ data class ErrorOr<out A> private constructor(val value: A?, val error: Throwabl
} }
/** /**
* Returns an observable that buffers events until subscribed. * Returns an Observable that buffers events until subscribed.
*
* @see UnicastSubject * @see UnicastSubject
*/ */
fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> { fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> {
@ -383,5 +382,18 @@ fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
return subject return subject
} }
/** Allows summing big decimals that are in iterable collections */ /**
* Returns a [ListenableFuture] bound to the *first* item emitted by this Observable. The future will complete with a
* NoSuchElementException if no items are emitted or any other error thrown by the Observable.
*/
fun <T> Observable<T>.toFuture(): ListenableFuture<T> {
val future = SettableFuture.create<T>()
first().subscribe(
{ future.set(it) },
{ future.setException(it) }
)
return future
}
/** Return the sum of an Iterable of [BigDecimal]s. */
fun Iterable<BigDecimal>.sum(): BigDecimal = fold(BigDecimal.ZERO) { a, b -> a + b } fun Iterable<BigDecimal>.sum(): BigDecimal = fold(BigDecimal.ZERO) { a, b -> a + b }

View File

@ -176,5 +176,6 @@ inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow
data class FlowHandle<A>( data class FlowHandle<A>(
val id: StateMachineRunId, val id: StateMachineRunId,
val progress: Observable<String>, val progress: Observable<String>,
// TODO This should be ListenableFuture<A>
val returnValue: Observable<A> val returnValue: Observable<A>
) )

View File

@ -131,7 +131,7 @@ inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossin
/** /**
* Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId. * Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
* The payload is deserilaized to an object of type [M]. Any exceptions thrown will be captured by the future. * The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future.
*/ */
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture<M> { fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture<M> {
val messageFuture = SettableFuture.create<M>() val messageFuture = SettableFuture.create<M>()

View File

@ -1,12 +1,12 @@
package net.corda.core.node.services package net.corda.core.node.services
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.* import net.corda.core.contracts.*
import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.toStringShort import net.corda.core.crypto.toStringShort
import net.corda.core.toFuture
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction import net.corda.core.transactions.WireTransaction
import rx.Observable import rx.Observable
@ -162,11 +162,7 @@ interface VaultService {
* Provide a [Future] for when a [StateRef] is consumed, which can be very useful in building tests. * Provide a [Future] for when a [StateRef] is consumed, which can be very useful in building tests.
*/ */
fun whenConsumed(ref: StateRef): ListenableFuture<Vault.Update> { fun whenConsumed(ref: StateRef): ListenableFuture<Vault.Update> {
val future = SettableFuture.create<Vault.Update>() return updates.filter { it.consumed.any { it.ref == ref } }.toFuture()
updates.filter { it.consumed.any { it.ref == ref } }.first().subscribe {
future.set(it)
}
return future
} }
/** /**

View File

@ -0,0 +1,47 @@
package net.corda.core
import org.assertj.core.api.Assertions.*
import org.junit.Test
import rx.subjects.PublishSubject
import java.util.*
class UtilsTest {
@Test
fun `toFuture - single item observable`() {
val subject = PublishSubject.create<String>()
val future = subject.toFuture()
subject.onNext("Hello")
assertThat(future.getOrThrow()).isEqualTo("Hello")
}
@Test
fun `toFuture - empty obserable`() {
val subject = PublishSubject.create<String>()
val future = subject.toFuture()
subject.onCompleted()
assertThatExceptionOfType(NoSuchElementException::class.java).isThrownBy {
future.getOrThrow()
}
}
@Test
fun `toFuture - more than one item observable`() {
val subject = PublishSubject.create<String>()
val future = subject.toFuture()
subject.onNext("Hello")
subject.onNext("World")
subject.onCompleted()
assertThat(future.getOrThrow()).isEqualTo("Hello")
}
@Test
fun `toFuture - erroring observable`() {
val subject = PublishSubject.create<String>()
val future = subject.toFuture()
val exception = Exception("Error")
subject.onError(exception)
assertThatThrownBy {
future.getOrThrow()
}.isSameAs(exception)
}
}

View File

@ -1,10 +1,10 @@
package net.corda.docs package net.corda.docs
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.* import net.corda.core.contracts.*
import net.corda.core.getOrThrow import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.CashCommand import net.corda.flows.CashCommand
@ -66,14 +66,9 @@ class FxTransactionBuildTutorialTest {
printBalances() printBalances()
// Setup some futures on the vaults to await the arrival of the exchanged funds at both nodes // Setup some futures on the vaults to await the arrival of the exchanged funds at both nodes
val done2 = SettableFuture.create<Unit>() val nodeAVaultUpdate = nodeA.services.vaultService.updates.toFuture()
val done3 = SettableFuture.create<Unit>() val nodeBVaultUpdate = nodeB.services.vaultService.updates.toFuture()
val subs2 = nodeA.services.vaultService.updates.subscribe {
done2.set(Unit)
}
val subs3 = nodeB.services.vaultService.updates.subscribe {
done3.set(Unit)
}
// Now run the actual Fx exchange // Now run the actual Fx exchange
val doIt = nodeA.services.startFlow(ForeignExchangeFlow("trade1", val doIt = nodeA.services.startFlow(ForeignExchangeFlow("trade1",
POUNDS(100).issuedBy(nodeB.info.legalIdentity.ref(0x01)), POUNDS(100).issuedBy(nodeB.info.legalIdentity.ref(0x01)),
@ -83,16 +78,14 @@ class FxTransactionBuildTutorialTest {
// wait for the flow to finish and the vault updates to be done // wait for the flow to finish and the vault updates to be done
doIt.resultFuture.getOrThrow() doIt.resultFuture.getOrThrow()
// Get the balances when the vault updates // Get the balances when the vault updates
done2.get() nodeAVaultUpdate.get()
val balancesA = databaseTransaction(nodeA.database) { val balancesA = databaseTransaction(nodeA.database) {
nodeA.services.vaultService.cashBalances nodeA.services.vaultService.cashBalances
} }
done3.get() nodeBVaultUpdate.get()
val balancesB = databaseTransaction(nodeB.database) { val balancesB = databaseTransaction(nodeB.database) {
nodeB.services.vaultService.cashBalances nodeB.services.vaultService.cashBalances
} }
subs2.unsubscribe()
subs3.unsubscribe()
println("BalanceA\n" + balancesA) println("BalanceA\n" + balancesA)
println("BalanceB\n" + balancesB) println("BalanceB\n" + balancesB)
// Verify the transfers occurred as expected // Verify the transfers occurred as expected

View File

@ -1,6 +1,5 @@
package net.corda.docs package net.corda.docs
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.LinearState import net.corda.core.contracts.LinearState
import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
@ -8,6 +7,7 @@ import net.corda.core.getOrThrow
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.linearHeadsOfType import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.toFuture
import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
@ -56,17 +56,13 @@ class WorkflowTransactionBuildTutorialTest {
@Test @Test
fun `Run workflow to completion`() { fun `Run workflow to completion`() {
// Setup a vault subscriber to wait for successful upload of the proposal to NodeB // Setup a vault subscriber to wait for successful upload of the proposal to NodeB
val done1 = SettableFuture.create<Unit>() val nodeBVaultUpdate = nodeB.services.vaultService.updates.toFuture()
val subs1 = nodeB.services.vaultService.updates.subscribe {
done1.set(Unit)
}
// Kick of the proposal flow // Kick of the proposal flow
val flow1 = nodeA.services.startFlow(SubmitTradeApprovalFlow("1234", nodeB.info.legalIdentity)) val flow1 = nodeA.services.startFlow(SubmitTradeApprovalFlow("1234", nodeB.info.legalIdentity))
// Wait for the flow to finish // Wait for the flow to finish
val proposalRef = flow1.resultFuture.getOrThrow() val proposalRef = flow1.resultFuture.getOrThrow()
// Wait for NodeB to include it's copy in the vault // Wait for NodeB to include it's copy in the vault
done1.get() nodeBVaultUpdate.get()
subs1.unsubscribe()
// Fetch the latest copy of the state from both nodes // Fetch the latest copy of the state from both nodes
val latestFromA = databaseTransaction(nodeA.database) { val latestFromA = databaseTransaction(nodeA.database) {
nodeA.services.latest<TradeApprovalContract.State>(proposalRef.ref) nodeA.services.latest<TradeApprovalContract.State>(proposalRef.ref)
@ -82,23 +78,15 @@ class WorkflowTransactionBuildTutorialTest {
assertEquals(proposalRef, latestFromA) assertEquals(proposalRef, latestFromA)
assertEquals(proposalRef, latestFromB) assertEquals(proposalRef, latestFromB)
// Setup a vault subscriber to pause until the final update is in NodeA and NodeB // Setup a vault subscriber to pause until the final update is in NodeA and NodeB
val done2 = SettableFuture.create<Unit>() val nodeAVaultUpdate = nodeA.services.vaultService.updates.toFuture()
val subs2 = nodeA.services.vaultService.updates.subscribe { val secondNodeBVaultUpdate = nodeB.services.vaultService.updates.toFuture()
done2.set(Unit)
}
val done3 = SettableFuture.create<Unit>()
val subs3 = nodeB.services.vaultService.updates.subscribe {
done3.set(Unit)
}
// Run the manual completion flow from NodeB // Run the manual completion flow from NodeB
val flow2 = nodeB.services.startFlow(SubmitCompletionFlow(latestFromB.ref, WorkflowState.APPROVED)) val flow2 = nodeB.services.startFlow(SubmitCompletionFlow(latestFromB.ref, WorkflowState.APPROVED))
// wait for the flow to end // wait for the flow to end
val completedRef = flow2.resultFuture.getOrThrow() val completedRef = flow2.resultFuture.getOrThrow()
// wait for the vault updates to stabilise // wait for the vault updates to stabilise
done2.get() nodeAVaultUpdate.get()
done3.get() secondNodeBVaultUpdate.get()
subs2.unsubscribe()
subs3.unsubscribe()
// Fetch the latest copies from the vault // Fetch the latest copies from the vault
val finalFromA = databaseTransaction(nodeA.database) { val finalFromA = databaseTransaction(nodeA.database) {
nodeA.services.latest<TradeApprovalContract.State>(proposalRef.ref) nodeA.services.latest<TradeApprovalContract.State>(proposalRef.ref)

View File

@ -144,7 +144,7 @@ class NodeSchedulerService(private val database: Database,
Pair(earliestState, rescheduled!!) Pair(earliestState, rescheduled!!)
} }
if (scheduledState != null) { if (scheduledState != null) {
schedulerTimerExecutor.execute() { schedulerTimerExecutor.execute {
log.trace { "Scheduling as next $scheduledState" } log.trace { "Scheduling as next $scheduledState" }
// This will block the scheduler single thread until the scheduled time (returns false) OR // This will block the scheduler single thread until the scheduled time (returns false) OR
// the Future is cancelled due to rescheduling (returns true). // the Future is cancelled due to rescheduling (returns true).
@ -152,7 +152,7 @@ class NodeSchedulerService(private val database: Database,
log.trace { "Invoking as next $scheduledState" } log.trace { "Invoking as next $scheduledState" }
onTimeReached(scheduledState) onTimeReached(scheduledState)
} else { } else {
log.trace { "Recheduled $scheduledState" } log.trace { "Rescheduled $scheduledState" }
} }
} }
} }

View File

@ -1,11 +1,11 @@
package net.corda.node.services.persistence package net.corda.node.services.persistence
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionType import net.corda.core.contracts.TransactionType
import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.NullPublicKey import net.corda.core.crypto.NullPublicKey
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY
@ -109,8 +109,7 @@ class DBTransactionStorageTests {
@Test @Test
fun `updates are fired`() { fun `updates are fired`() {
val future = SettableFuture.create<SignedTransaction>() val future = transactionStorage.updates.toFuture()
transactionStorage.updates.subscribe { tx -> future.set(tx) }
val expected = newTransaction() val expected = newTransaction()
databaseTransaction(database) { databaseTransaction(database) {
transactionStorage.addTransaction(expected) transactionStorage.addTransaction(expected)

View File

@ -4,12 +4,12 @@ package net.corda.testing
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.crypto.* import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.DUMMY_NOTARY_KEY
@ -17,12 +17,10 @@ import net.corda.node.internal.AbstractNode
import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager.Change
import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.testing.node.MockIdentityService import net.corda.testing.node.MockIdentityService
import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties import net.corda.testing.node.makeTestDataSourceProperties
import rx.Subscriber
import java.net.ServerSocket import java.net.ServerSocket
import java.nio.file.Path import java.nio.file.Path
import java.security.KeyPair import java.security.KeyPair
@ -141,25 +139,8 @@ fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List<HostAndPort> {
inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow( inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow(
markerClass: KClass<out FlowLogic<*>>, markerClass: KClass<out FlowLogic<*>>,
noinline flowFactory: (Party) -> P): ListenableFuture<P> { noinline flowFactory: (Party) -> P): ListenableFuture<P> {
val future = smm.changes.filter { it.addOrRemove == ADD && it.logic is P }.map { it.logic as P }.toFuture()
services.registerFlowInitiator(markerClass, flowFactory) services.registerFlowInitiator(markerClass, flowFactory)
val future = SettableFuture.create<P>()
val subscriber = object : Subscriber<Change>() {
override fun onNext(change: Change) {
if (change.addOrRemove == ADD) {
unsubscribe()
future.set(change.logic as P)
}
}
override fun onError(e: Throwable) {
future.setException(e)
}
override fun onCompleted() {}
}
smm.changes.subscribe(subscriber)
return future return future
} }