Refactor then/success/failure (#984)

to make ListenableFuture replacement less fiddly.
This commit is contained in:
Andrzej Cichocki 2017-07-07 15:50:50 +01:00 committed by GitHub
parent 74c8346863
commit d2869e4f45
17 changed files with 90 additions and 117 deletions

View File

@ -5,7 +5,7 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.success
import net.corda.core.thenMatch
import net.corda.node.services.messaging.getRpcContext
import net.corda.nodeapi.RPCSinceVersion
import net.corda.testing.RPCDriverExposedDSLInterface
@ -158,12 +158,12 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
val clientQuotes = LinkedBlockingQueue<String>()
val clientFuture = proxy.makeComplicatedListenableFuture()
clientFuture.success {
clientFuture.thenMatch({
val name = it.first
it.second.success {
it.second.thenMatch({
clientQuotes += "Quote by $name: $it"
}
}
}, {})
}, {})
assertThat(clientQuotes).isEmpty()

View File

@ -25,7 +25,6 @@ import java.time.Duration
import java.time.temporal.Temporal
import java.util.concurrent.*
import java.util.concurrent.locks.ReentrantLock
import java.util.function.BiConsumer
import java.util.stream.Stream
import java.util.zip.Deflater
import java.util.zip.ZipEntry
@ -67,38 +66,20 @@ fun <T> Future<T>.getOrThrow(timeout: Duration? = null): T {
}
}
fun <T> future(block: () -> T): ListenableFuture<T> = CompletableToListenable(CompletableFuture.supplyAsync(block))
fun <V> future(block: () -> V): Future<V> = CompletableFuture.supplyAsync(block)
private class CompletableToListenable<T>(private val base: CompletableFuture<T>) : Future<T> by base, ListenableFuture<T> {
override fun addListener(listener: Runnable, executor: Executor) {
base.whenCompleteAsync(BiConsumer { _, _ -> listener.run() }, executor)
}
}
fun <F : ListenableFuture<*>, V> F.then(block: (F) -> V) = addListener(Runnable { block(this) }, MoreExecutors.directExecutor())
// Some utilities for working with Guava listenable futures.
fun <T> ListenableFuture<T>.then(executor: Executor, body: () -> Unit) = addListener(Runnable(body), executor)
fun <T> ListenableFuture<T>.success(executor: Executor, body: (T) -> Unit) = then(executor) {
val r = try {
get()
} catch(e: Throwable) {
return@then
}
body(r)
}
fun <T> ListenableFuture<T>.failure(executor: Executor, body: (Throwable) -> Unit) = then(executor) {
try {
fun <U, V> Future<U>.match(success: (U) -> V, failure: (Throwable) -> V): V {
return success(try {
getOrThrow()
} catch (t: Throwable) {
body(t)
}
return failure(t)
})
}
infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> = apply { then(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
fun ListenableFuture<*>.andForget(log: Logger) = failure(RunOnCallerThread) { log.error("Background task failed:", it) }
fun <U, V, W> ListenableFuture<U>.thenMatch(success: (U) -> V, failure: (Throwable) -> W) = then { it.match(success, failure) }
fun ListenableFuture<*>.andForget(log: Logger) = then { it.match({}, { log.error("Background task failed:", it) }) }
@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, { (mapper as (F?) -> T)(it) })
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
@ -114,12 +95,12 @@ inline fun <T> SettableFuture<T>.catch(block: () -> T) {
fun <A> ListenableFuture<out A>.toObservable(): Observable<A> {
return Observable.create { subscriber ->
success {
thenMatch({
subscriber.onNext(it)
subscriber.onCompleted()
} failure {
}, {
subscriber.onError(it)
}
})
}
}
@ -204,9 +185,6 @@ fun <T> List<T>.randomOrNull(): T? {
/** Returns a random element in the list matching the given predicate, or null if none found */
fun <T> List<T>.randomOrNull(predicate: (T) -> Boolean) = filter(predicate).randomOrNull()
// An alias that can sometimes make code clearer to read.
val RunOnCallerThread: Executor = MoreExecutors.directExecutor()
inline fun elapsedTime(block: () -> Unit): Duration {
val start = System.nanoTime()
block()

View File

@ -4,7 +4,7 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.catch
import net.corda.core.failure
import net.corda.core.match
import net.corda.core.then
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -29,7 +29,7 @@ internal fun <S, T> firstOf(futures: Array<out ListenableFuture<out S>>, log: Lo
if (winnerChosen.compareAndSet(false, true)) {
resultFuture.catch { handler(it) }
} else if (!it.isCancelled) {
it.failure { log.error(shortCircuitedTaskFailedMessage, it) }
it.match({}, { log.error(shortCircuitedTaskFailedMessage, it) })
}
}
}

View File

@ -4,13 +4,11 @@ import com.codahale.metrics.JmxReporter
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.flatMap
import net.corda.core.*
import net.corda.core.messaging.RPCOps
import net.corda.core.minutes
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.seconds
import net.corda.core.success
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.parseNetworkHostAndPort
@ -298,27 +296,29 @@ open class Node(override val configuration: FullNodeConfiguration,
override fun start(): Node {
super.start()
networkMapRegistrationFuture.success(serverThread) {
// Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia:
//
// https://jolokia.org/agent/jvm.html
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
networkMapRegistrationFuture.thenMatch({
serverThread.execute {
// Begin exporting our own metrics via JMX. These can be monitored using any agent, e.g. Jolokia:
//
// https://jolokia.org/agent/jvm.html
JmxReporter.
forRegistry(services.monitoringService.metrics).
inDomain("net.corda").
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
(startupComplete as SettableFuture<Unit>).set(Unit)
}
(startupComplete as SettableFuture<Unit>).set(Unit)
}
}, {})
shutdownHook = addShutdownHook {
stop()
}

View File

@ -103,7 +103,7 @@ open class NodeStartup(val args: Array<String>) {
node.start()
printPluginsAndServices(node)
node.networkMapRegistrationFuture.success {
node.networkMapRegistrationFuture.thenMatch({
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
// TODO: Replace this with a standard function to get an unambiguous rendering of the X.500 name.
val name = node.info.legalIdentity.name.orgName ?: node.info.legalIdentity.name.commonName
@ -111,14 +111,14 @@ open class NodeStartup(val args: Array<String>) {
// Don't start the shell if there's no console attached.
val runShell = !cmdlineOptions.noLocalShell && System.console() != null
node.startupComplete then {
node.startupComplete.then {
try {
InteractiveShell.startShell(cmdlineOptions.baseDirectory, runShell, cmdlineOptions.sshdServer, node)
} catch(e: Throwable) {
logger.error("Shell failed to start", e)
}
}
}
}, {})
node.run()
}

View File

@ -184,7 +184,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Create a queue, consumer and producer for handling P2P network messages.
p2pConsumer = makeP2PConsumer(session, true)
networkMapRegistrationFuture.success {
networkMapRegistrationFuture.thenMatch({
state.locked {
log.info("Network map is complete, so removing filter from P2P consumer.")
try {
@ -194,7 +194,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
p2pConsumer = makeP2PConsumer(session, false)
}
}
}, {})
rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, config.myLegalName)

View File

@ -195,7 +195,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
fun start() {
restoreFibersFromCheckpoints()
listenToLedgerTransactions()
serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() }
serviceHub.networkMapCache.mapServiceRegistered.then { executor.execute(this::resumeRestoredFibers) }
}
private fun listenToLedgerTransactions() {

View File

@ -25,7 +25,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub
init {
// The future is public and can be completed by something else to indicate we don't wish to follow
// anymore (e.g. the user pressing Ctrl-C).
future then { unsubscribe() }
future.then { unsubscribe() }
}
@Synchronized

View File

@ -394,7 +394,7 @@ object InteractiveShell {
init {
// The future is public and can be completed by something else to indicate we don't wish to follow
// anymore (e.g. the user pressing Ctrl-C).
future then { unsubscribe() }
future.then { unsubscribe() }
}
@Synchronized

View File

@ -3,10 +3,7 @@ package net.corda.netmap.simulation
import co.paralleluniverse.fibers.Suspendable
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.google.common.util.concurrent.FutureCallback
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.*
import net.corda.core.*
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UniqueIdentifier
@ -49,7 +46,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val future = SettableFuture.create<Unit>()
om = JacksonSupport.createInMemoryMapper(InMemoryIdentityService((banks + regulators + networkMap).map { it.info.legalIdentityAndCert }, trustRoot = DUMMY_CA.certificate))
startIRSDealBetween(0, 1).success {
startIRSDealBetween(0, 1).thenMatch({
// Next iteration is a pause.
executeOnNextIteration.add {}
executeOnNextIteration.add {
@ -67,16 +64,16 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
executeOnNextIteration.add {
val f = doNextFixing(0, 1)
if (f != null) {
Futures.addCallback(f, this, RunOnCallerThread)
Futures.addCallback(f, this, MoreExecutors.directExecutor())
} else {
// All done!
future.set(Unit)
}
}
}
}, RunOnCallerThread)
}, MoreExecutors.directExecutor())
}
}
}, {})
return future
}

View File

@ -18,8 +18,7 @@ import javafx.scene.layout.HBox
import javafx.scene.layout.VBox
import javafx.util.Duration
import net.corda.core.crypto.commonName
import net.corda.core.failure
import net.corda.core.success
import net.corda.core.match
import net.corda.core.then
import net.corda.core.messaging.CordaRPCOps
import net.corda.demobench.explorer.ExplorerController
@ -157,19 +156,20 @@ class NodeTerminalView : Fragment() {
launchWebButton.graphic = ProgressIndicator()
log.info("Starting web server for ${config.legalName}")
webServer.open(config) then {
webServer.open(config).then {
Platform.runLater {
launchWebButton.graphic = null
}
} success {
log.info("Web server for ${config.legalName} started on $it")
Platform.runLater {
webURL = it
launchWebButton.text = "Reopen\nweb site"
app.hostServices.showDocument(it.toString())
}
} failure {
launchWebButton.text = oldLabel
it.match({
log.info("Web server for ${config.legalName} started on $it")
Platform.runLater {
webURL = it
launchWebButton.text = "Reopen\nweb site"
app.hostServices.showDocument(it.toString())
}
}, {
launchWebButton.text = oldLabel
})
}
}
}

View File

@ -10,14 +10,13 @@ import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.GBP
import net.corda.core.contracts.USD
import net.corda.core.failure
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.FlowHandle
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.thenMatch
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.success
import net.corda.flows.*
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
@ -133,11 +132,11 @@ class ExplorerSimulation(val options: OptionSet) {
// Log to logger when flow finish.
fun FlowHandle<AbstractCashFlow.Result>.log(seq: Int, name: String) {
val out = "[$seq] $name $id :"
returnValue.success { (stx) ->
returnValue.thenMatch({ (stx) ->
Main.log.info("$out ${stx.id} ${(stx.tx.outputs.first().data as Cash.State).amount}")
}.failure {
}, {
Main.log.info("$out ${it.message}")
}
})
}
for (i in 0..maxIterations) {

View File

@ -28,7 +28,6 @@ import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.then
import net.corda.core.transactions.SignedTransaction
import net.corda.explorer.formatters.PartyNameFormatter
import net.corda.explorer.model.CashTransaction
@ -106,7 +105,11 @@ class NewTransaction : Fragment() {
command.startFlow(rpcProxy.value!!)
}
runAsync {
handle.returnValue.then { dialog.dialogPane.isDisable = false }.getOrThrow()
try {
handle.returnValue.getOrThrow()
} finally {
dialog.dialogPane.isDisable = false
}
}.ui { it ->
val stx: SignedTransaction = it.stx
val type = when (command) {

View File

@ -1,6 +1,5 @@
package net.corda.loadtest
import com.google.common.util.concurrent.ListenableFuture
import com.jcraft.jsch.ChannelExec
import com.jcraft.jsch.Session
import net.corda.client.rpc.CordaRPCClient
@ -14,6 +13,7 @@ import net.corda.nodeapi.internal.addShutdownHook
import java.io.ByteArrayOutputStream
import java.io.Closeable
import java.io.OutputStream
import java.util.concurrent.Future
/**
* [NodeConnection] allows executing remote shell commands on the node as well as executing RPCs.
@ -85,7 +85,7 @@ class NodeConnection(val remoteNode: RemoteNode, private val jSchSession: Sessio
return ShellCommandOutput(command, exitCode, stdoutStream.toString(), stderrStream.toString())
}
private fun runShellCommand(command: String, stdout: OutputStream, stderr: OutputStream): ListenableFuture<Int> {
private fun runShellCommand(command: String, stdout: OutputStream, stderr: OutputStream): Future<Int> {
log.info("Running '$command' on ${remoteNode.hostname}")
return future {
val (exitCode, _) = withChannelExec(command) { channel ->

View File

@ -7,10 +7,9 @@ import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Issued
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.USD
import net.corda.core.failure
import net.corda.core.identity.AbstractParty
import net.corda.core.thenMatch
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.success
import net.corda.flows.CashFlowCommand
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeConnection
@ -207,12 +206,11 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
execute = { command ->
val result = command.command.startFlow(command.node.proxy).returnValue
result.failure {
log.error("Failure[$command]", it)
}
result.success {
result.thenMatch({
log.info("Success[$command]: $result")
}
}, {
log.error("Failure[$command]", it)
})
},
gatherRemoteState = { previousState ->

View File

@ -9,7 +9,7 @@ import net.corda.contracts.asset.DUMMY_CASH_ISSUER_KEY
import net.corda.testing.contracts.DummyContract
import net.corda.core.flows.FlowException
import net.corda.core.messaging.startFlow
import net.corda.core.success
import net.corda.core.thenMatch
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.FinalityFlow
import net.corda.loadtest.LoadTest
@ -42,9 +42,9 @@ val dummyNotarisationTest = LoadTest<NotariseCommand, Unit>(
try {
val proxy = node.proxy
val issueFlow = proxy.startFlow(::FinalityFlow, issueTx)
issueFlow.returnValue.success {
issueFlow.returnValue.thenMatch({
val moveFlow = proxy.startFlow(::FinalityFlow, moveTx)
}
}, {})
} catch (e: FlowException) {
log.error("Failure", e)
}

View File

@ -3,11 +3,10 @@ package net.corda.loadtest.tests
import net.corda.client.mock.Generator
import net.corda.core.contracts.Amount
import net.corda.core.contracts.USD
import net.corda.core.failure
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.thenMatch
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.flows.CashFlowCommand
import net.corda.loadtest.LoadTest
@ -25,12 +24,11 @@ object StabilityTest {
interpret = { _, _ -> },
execute = { command ->
val result = command.command.startFlow(command.node.proxy).returnValue
result.failure {
log.error("Failure[$command]", it)
}
result.success {
result.thenMatch({
log.info("Success[$command]: $result")
}
}, {
log.error("Failure[$command]", it)
})
},
gatherRemoteState = {}
)