Remove ListenableFuture from core/main (#1030)

This commit is contained in:
Andrzej Cichocki 2017-08-07 14:31:24 +01:00 committed by GitHub
parent d9eb9fe64f
commit 3a3ead2dfe
128 changed files with 1012 additions and 624 deletions

View File

@ -11,7 +11,6 @@ import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.StateMachineRunId
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
@ -22,6 +21,7 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashExitFlow
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow

View File

@ -1,6 +1,6 @@
package net.corda.client.rpc;
import com.google.common.util.concurrent.ListenableFuture;
import net.corda.core.concurrent.CordaFuture;
import net.corda.client.rpc.internal.RPCClient;
import net.corda.core.contracts.Amount;
import net.corda.core.messaging.CordaRPCOps;
@ -46,7 +46,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest {
@Before
public void setUp() throws ExecutionException, InterruptedException {
Set<ServiceInfo> services = new HashSet<>(Collections.singletonList(new ServiceInfo(ValidatingNotaryService.Companion.getType(), null)));
ListenableFuture<Node> nodeFuture = startNode(getALICE().getName(), 1, services, Arrays.asList(rpcUser), Collections.emptyMap());
CordaFuture<Node> nodeFuture = startNode(getALICE().getName(), 1, services, Arrays.asList(rpcUser), Collections.emptyMap());
node = nodeFuture.get();
client = new CordaRPCClient(node.getConfiguration().getRpcAddress(), null, getDefault(), false);
}

View File

@ -6,10 +6,10 @@ import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.USD
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowInitiator
import net.corda.core.getOrThrow
import net.corda.core.messaging.*
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashException
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow

View File

@ -1,17 +1,13 @@
package net.corda.client.rpc
import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.crypto.random63BitValue
import net.corda.core.future
import net.corda.core.getOrThrow
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.testing.*
@ -24,10 +20,7 @@ import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.time.Duration
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
class RPCStabilityTests {
@ -233,9 +226,7 @@ class RPCStabilityTests {
assertEquals("pong", client.ping())
serverFollower.shutdown()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
val pingFuture = future {
client.ping()
}
val pingFuture = ForkJoinPool.commonPool().fork(client::ping)
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
@ -269,9 +260,9 @@ class RPCStabilityTests {
).get()
val numberOfClients = 4
val clients = Futures.allAsList((1 .. numberOfClients).map {
val clients = (1 .. numberOfClients).map {
startRandomRpcClient<TrackSubscriberOps>(server.broker.hostAndPort!!)
}).get()
}.transpose().get()
// Poll until all clients connect
pollUntilClientNumber(server, numberOfClients)

View File

@ -12,7 +12,6 @@ import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.internal.ThreadBox
import net.corda.core.crypto.random63BitValue
import net.corda.core.getOrThrow
import net.corda.core.internal.LazyPool
import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle
@ -112,7 +111,7 @@ class RPCClientProxyHandler(
private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext)
private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<RPCApi.ObservableId, UnicastSubject<Notification<Any>>> {
val onObservableRemove = RemovalListener<RPCApi.ObservableId, UnicastSubject<Notification<*>>> {
val rpcCallSite = callSiteMap?.remove(it.key.toLong)
if (it.cause == RemovalCause.COLLECTED) {
log.warn(listOf(
@ -337,7 +336,7 @@ class RPCClientProxyHandler(
}
}
private typealias RpcObservableMap = Cache<RPCApi.ObservableId, UnicastSubject<Notification<Any>>>
private typealias RpcObservableMap = Cache<RPCApi.ObservableId, UnicastSubject<Notification<*>>>
private typealias RpcReplyMap = ConcurrentHashMap<RPCApi.RpcRequestId, SettableFuture<Any?>>
private typealias CallSiteMap = ConcurrentHashMap<Long, Throwable?>
@ -356,18 +355,17 @@ data class ObservableContext(
/**
* A [Serializer] to deserialise Observables once the corresponding Kryo instance has been provided with an [ObservableContext].
*/
object RpcClientObservableSerializer : Serializer<Observable<Any>>() {
object RpcClientObservableSerializer : Serializer<Observable<*>>() {
private object RpcObservableContextKey
fun createContext(serializationContext: SerializationContext, observableContext: ObservableContext): SerializationContext {
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
}
override fun read(kryo: Kryo, input: Input, type: Class<Observable<Any>>): Observable<Any> {
@Suppress("UNCHECKED_CAST")
override fun read(kryo: Kryo, input: Input, type: Class<Observable<*>>): Observable<Any> {
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
val observableId = RPCApi.ObservableId(input.readLong(true))
val observable = UnicastSubject.create<Notification<Any>>()
val observable = UnicastSubject.create<Notification<*>>()
require(observableContext.observableMap.getIfPresent(observableId) == null) {
"Multiple Observables arrived with the same ID $observableId"
}
@ -384,7 +382,7 @@ object RpcClientObservableSerializer : Serializer<Observable<Any>>() {
}.dematerialize()
}
override fun write(kryo: Kryo, output: Output, observable: Observable<Any>) {
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
throw UnsupportedOperationException("Cannot serialise Observables on the client side")
}

View File

@ -9,13 +9,13 @@ import net.corda.contracts.getCashBalance
import net.corda.contracts.getCashBalances
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.getOrThrow
import net.corda.core.internal.InputStreamAndHash
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import net.corda.flows.CashIssueFlow

View File

@ -1,8 +1,8 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.flatMap
import net.corda.core.map
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.User
@ -44,13 +44,13 @@ open class AbstractRPCTest {
startInVmRpcClient<I>(rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startInVmArtemisSession(rpcUser.username, rpcUser.password) })
}
}.get()
}
RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { server ->
startRpcClient<I>(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password) })
}
}.get()
}
}
}.get()
}
}

View File

@ -1,11 +1,11 @@
package net.corda.client.rpc
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.getOrThrow
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.messaging.RPCOps
import net.corda.core.thenMatch
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.messaging.getRpcContext
import net.corda.nodeapi.RPCSinceVersion
import net.corda.testing.RPCDriverExposedDSLInterface
@ -43,9 +43,9 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
fun makeComplicatedObservable(): Observable<Pair<String, Observable<String>>>
fun makeListenableFuture(): ListenableFuture<Int>
fun makeListenableFuture(): CordaFuture<Int>
fun makeComplicatedListenableFuture(): ListenableFuture<Pair<String, ListenableFuture<String>>>
fun makeComplicatedListenableFuture(): CordaFuture<Pair<String, CordaFuture<String>>>
@RPCSinceVersion(2)
fun addedLater()
@ -54,7 +54,7 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
}
private lateinit var complicatedObservable: Observable<Pair<String, Observable<String>>>
private lateinit var complicatedListenableFuturee: ListenableFuture<Pair<String, ListenableFuture<String>>>
private lateinit var complicatedListenableFuturee: CordaFuture<Pair<String, CordaFuture<String>>>
inner class TestOpsImpl : TestOps {
override val protocolVersion = 1
@ -62,9 +62,9 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
override fun void() {}
override fun someCalculation(str: String, num: Int) = "$str $num"
override fun makeObservable(): Observable<Int> = Observable.just(1, 2, 3, 4)
override fun makeListenableFuture(): ListenableFuture<Int> = Futures.immediateFuture(1)
override fun makeListenableFuture() = doneFuture(1)
override fun makeComplicatedObservable() = complicatedObservable
override fun makeComplicatedListenableFuture(): ListenableFuture<Pair<String, ListenableFuture<String>>> = complicatedListenableFuturee
override fun makeComplicatedListenableFuture() = complicatedListenableFuturee
override fun addedLater(): Unit = throw IllegalStateException()
override fun captureUser(): String = getRpcContext().currentUser.username
}
@ -152,10 +152,10 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
fun `complex ListenableFuture`() {
rpcDriver {
val proxy = testProxy()
val serverQuote = SettableFuture.create<Pair<String, ListenableFuture<String>>>()
val serverQuote = openFuture<Pair<String, CordaFuture<String>>>()
complicatedListenableFuturee = serverQuote
val twainQuote = "Mark Twain" to Futures.immediateFuture("I have never let my schooling interfere with my education.")
val twainQuote = "Mark Twain" to doneFuture("I have never let my schooling interfere with my education.")
val clientQuotes = LinkedBlockingQueue<String>()
val clientFuture = proxy.makeComplicatedListenableFuture()

View File

@ -1,10 +1,10 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.future
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.serialization.CordaSerializable
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.RPCDriverExposedDSLInterface
@ -17,6 +17,7 @@ import rx.subjects.UnicastSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ForkJoinPool
@RunWith(Parameterized::class)
class RPCConcurrencyTests : AbstractRPCTest() {
@ -68,7 +69,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
Observable.empty<ObservableRose<Int>>()
} else {
val publish = UnicastSubject.create<ObservableRose<Int>>()
future {
ForkJoinPool.commonPool().fork {
(1..branchingFactor).toList().parallelStream().forEach {
publish.onNext(getParallelObservableTree(depth - 1, branchingFactor))
}
@ -105,7 +106,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
val done = CountDownLatch(numberOfBlockedCalls)
// Start a couple of blocking RPC calls
(1..numberOfBlockedCalls).forEach {
future {
ForkJoinPool.commonPool().fork {
proxy.ops.waitLatch(id)
done.countDown()
}

View File

@ -2,55 +2,15 @@
package net.corda.core
import com.google.common.util.concurrent.*
import org.slf4j.Logger
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import rx.Observable
import rx.Observer
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
// TODO Delete this file once the Future stuff is out of here
/** Same as [Future.get] but with a more descriptive name, and doesn't throw [ExecutionException], instead throwing its cause */
fun <T> Future<T>.getOrThrow(timeout: Duration? = null): T {
return try {
if (timeout == null) get() else get(timeout.toNanos(), TimeUnit.NANOSECONDS)
} catch (e: ExecutionException) {
throw e.cause!!
}
}
fun <V> future(block: () -> V): Future<V> = CompletableFuture.supplyAsync(block)
fun <F : ListenableFuture<*>, V> F.then(block: (F) -> V) = addListener(Runnable { block(this) }, MoreExecutors.directExecutor())
fun <U, V> Future<U>.match(success: (U) -> V, failure: (Throwable) -> V): V {
return success(try {
getOrThrow()
} catch (t: Throwable) {
return failure(t)
})
}
fun <U, V, W> ListenableFuture<U>.thenMatch(success: (U) -> V, failure: (Throwable) -> W) = then { it.match(success, failure) }
fun ListenableFuture<*>.andForget(log: Logger) = then { it.match({}, { log.error("Background task failed:", it) }) }
@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, { (mapper as (F?) -> T)(it) })
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
/** Executes the given block and sets the future to either the result, or any exception that was thrown. */
inline fun <T> SettableFuture<T>.catch(block: () -> T) {
try {
set(block())
} catch (t: Throwable) {
setException(t)
}
}
fun <A> ListenableFuture<out A>.toObservable(): Observable<A> {
fun <A> CordaFuture<out A>.toObservable(): Observable<A> {
return Observable.create { subscriber ->
thenMatch({
subscriber.onNext(it)
@ -62,26 +22,25 @@ fun <A> ListenableFuture<out A>.toObservable(): Observable<A> {
}
/**
* Returns a [ListenableFuture] bound to the *first* item emitted by this Observable. The future will complete with a
* Returns a [CordaFuture] bound to the *first* item emitted by this Observable. The future will complete with a
* NoSuchElementException if no items are emitted or any other error thrown by the Observable. If it's cancelled then
* it will unsubscribe from the observable.
*/
fun <T> Observable<T>.toFuture(): ListenableFuture<T> = ObservableToFuture(this)
fun <T> Observable<T>.toFuture(): CordaFuture<T> = openFuture<T>().also {
val subscription = first().subscribe(object : Observer<T> {
override fun onNext(value: T) {
it.set(value)
}
private class ObservableToFuture<T>(observable: Observable<T>) : AbstractFuture<T>(), Observer<T> {
private val subscription = observable.first().subscribe(this)
override fun onNext(value: T) {
set(value)
override fun onError(e: Throwable) {
it.setException(e)
}
override fun onCompleted() {}
})
it.then {
if (it.isCancelled) {
subscription.unsubscribe()
}
}
override fun onError(e: Throwable) {
setException(e)
}
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
subscription.unsubscribe()
return super.cancel(mayInterruptIfRunning)
}
override fun onCompleted() {}
}

View File

@ -1,34 +1,44 @@
package net.corda.core.concurrent
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.catch
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.utilities.getOrThrow
import net.corda.core.internal.VisibleForTesting
import net.corda.core.match
import net.corda.core.then
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
/** Invoke [getOrThrow] and pass the value/throwable to success/failure respectively. */
fun <V, W> Future<V>.match(success: (V) -> W, failure: (Throwable) -> W): W {
val value = try {
getOrThrow()
} catch (t: Throwable) {
return failure(t)
}
return success(value)
}
/**
* As soon as a given future becomes done, the handler is invoked with that future as its argument.
* The result of the handler is copied into the result future, and the handler isn't invoked again.
* If a given future errors after the result future is done, the error is automatically logged.
*/
fun <S, T> firstOf(vararg futures: ListenableFuture<out S>, handler: (ListenableFuture<out S>) -> T) = firstOf(futures, defaultLog, handler)
fun <V, W> firstOf(vararg futures: CordaFuture<out V>, handler: (CordaFuture<out V>) -> W) = firstOf(futures, defaultLog, handler)
private val defaultLog = LoggerFactory.getLogger("net.corda.core.concurrent")
@VisibleForTesting
internal val shortCircuitedTaskFailedMessage = "Short-circuited task failed:"
internal fun <S, T> firstOf(futures: Array<out ListenableFuture<out S>>, log: Logger, handler: (ListenableFuture<out S>) -> T): ListenableFuture<T> {
val resultFuture = SettableFuture.create<T>()
internal fun <V, W> firstOf(futures: Array<out CordaFuture<out V>>, log: Logger, handler: (CordaFuture<out V>) -> W): CordaFuture<W> {
val resultFuture = openFuture<W>()
val winnerChosen = AtomicBoolean()
futures.forEach {
it.then {
if (winnerChosen.compareAndSet(false, true)) {
resultFuture.catch { handler(it) }
} else if (!it.isCancelled) {
resultFuture.capture { handler(it) }
} else if (it.isCancelled) {
// Do nothing.
} else {
it.match({}, { log.error(shortCircuitedTaskFailedMessage, it) })
}
}

View File

@ -0,0 +1,22 @@
package net.corda.core.concurrent
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
/**
* Same as [Future] with additional methods to provide some of the features of [java.util.concurrent.CompletableFuture] while minimising the API surface area.
* In Kotlin, to avoid compile errors, whenever CordaFuture is used in a parameter or extension method receiver type, its type parameter should be specified with out variance.
*/
interface CordaFuture<V> : Future<V> {
/**
* Run the given callback when this future is done, on the completion thread.
* If the completion thread is problematic for you e.g. deadlock, you can submit to an executor manually.
* If callback fails, its throwable is logged.
*/
fun <W> then(callback: (CordaFuture<V>) -> W): Unit
/**
* @return a new [CompletableFuture] with the same outcome as this Future.
*/
fun toCompletableFuture(): CompletableFuture<V>
}

View File

@ -1,7 +1,7 @@
package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
@ -37,6 +37,6 @@ interface FlowStateMachine<R> {
val serviceHub: ServiceHub
val logger: Logger
val id: StateMachineRunId
val resultFuture: ListenableFuture<R>
val resultFuture: CordaFuture<R>
val flowInitiator: FlowInitiator
}

View File

@ -0,0 +1,146 @@
package net.corda.core.internal.concurrent
import com.google.common.annotations.VisibleForTesting
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.match
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import org.slf4j.Logger
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
/** @return a fresh [OpenFuture]. */
fun <V> openFuture(): OpenFuture<V> = CordaFutureImpl()
/** @return a done future with the given value as its outcome. */
fun <V> doneFuture(value: V): CordaFuture<V> = CordaFutureImpl<V>().apply { set(value) }
/** @return a future that will have the same outcome as the given block, when this executor has finished running it. */
fun <V> Executor.fork(block: () -> V): CordaFuture<V> = CordaFutureImpl<V>().also { execute { it.capture(block) } }
/** When this future is done, do [match]. */
fun <V, W, X> CordaFuture<out V>.thenMatch(success: (V) -> W, failure: (Throwable) -> X) = then { match(success, failure) }
/** When this future is done and the outcome is failure, log the throwable. */
fun CordaFuture<*>.andForget(log: Logger) = thenMatch({}, { log.error("Background task failed:", it) })
/**
* Returns a future that will have an outcome of applying the given transform to this future's value.
* But if this future fails, the transform is not invoked and the returned future becomes done with the same throwable.
*/
fun <V, W> CordaFuture<out V>.map(transform: (V) -> W): CordaFuture<W> = CordaFutureImpl<W>().also { result ->
thenMatch({
result.capture { transform(it) }
}, {
result.setException(it)
})
}
/**
* Returns a future that will have the same outcome as the future returned by the given transform.
* But if this future or the transform fails, the returned future's outcome is the same throwable.
* In the case where this future fails, the transform is not invoked.
*/
fun <V, W> CordaFuture<out V>.flatMap(transform: (V) -> CordaFuture<out W>): CordaFuture<W> = CordaFutureImpl<W>().also { result ->
thenMatch(success@ {
result.captureLater(try {
transform(it)
} catch (t: Throwable) {
result.setException(t)
return@success
})
}, {
result.setException(it)
})
}
/**
* If all of the given futures succeed, the returned future's outcome is a list of all their values.
* The values are in the same order as the futures in the collection, not the order of completion.
* If at least one given future fails, the returned future's outcome is the first throwable that was thrown.
* Any subsequent throwables are added to the first one as suppressed throwables, in the order they are thrown.
* If no futures were given, the returned future has an immediate outcome of empty list.
* Otherwise the returned future does not have an outcome until all given futures have an outcome.
* Unlike Guava's Futures.allAsList, this method never hides failures/hangs subsequent to the first failure.
*/
fun <V> Collection<CordaFuture<out V>>.transpose(): CordaFuture<List<V>> {
if (isEmpty()) return doneFuture(emptyList())
val transpose = CordaFutureImpl<List<V>>()
val stateLock = Any()
var failure: Throwable? = null
var remaining = size
forEach {
it.then { doneFuture ->
synchronized(stateLock) {
doneFuture.match({}, { throwable ->
if (failure == null) failure = throwable else failure!!.addSuppressed(throwable)
})
if (--remaining == 0) {
if (failure == null) transpose.set(map { it.getOrThrow() }) else transpose.setException(failure!!)
}
}
}
}
return transpose
}
/** The contravariant members of [OpenFuture]. */
interface ValueOrException<in V> {
/** @return whether this future actually changed. */
fun set(value: V): Boolean
/** @return whether this future actually changed. */
fun setException(t: Throwable): Boolean
/** When the given future has an outcome, make this future have the same outcome. */
fun captureLater(f: CordaFuture<out V>) = f.then { capture { f.getOrThrow() } }
/** Run the given block (in the foreground) and set this future to its outcome. */
fun capture(block: () -> V): Boolean {
return set(try {
block()
} catch (t: Throwable) {
return setException(t)
})
}
}
/** A [CordaFuture] with additional methods to complete it with a value, exception or the outcome of another future. */
interface OpenFuture<V> : ValueOrException<V>, CordaFuture<V>
/** Unless you really want this particular implementation, use [openFuture] to make one. */
@VisibleForTesting
internal class CordaFutureImpl<V>(private val impl: CompletableFuture<V> = CompletableFuture()) : Future<V> by impl, OpenFuture<V> {
companion object {
private val defaultLog = loggerFor<CordaFutureImpl<*>>()
internal val listenerFailedMessage = "Future listener failed:"
}
override fun set(value: V) = impl.complete(value)
override fun setException(t: Throwable) = impl.completeExceptionally(t)
override fun <W> then(callback: (CordaFuture<V>) -> W) = thenImpl(defaultLog, callback)
/** For testing only. */
internal fun <W> thenImpl(log: Logger, callback: (CordaFuture<V>) -> W) {
impl.whenComplete { _, _ ->
try {
callback(this)
} catch (t: Throwable) {
log.error(listenerFailedMessage, t)
}
}
}
// We don't simply return impl so that the caller can't interfere with it.
override fun toCompletableFuture() = CompletableFuture<V>().also { completable ->
thenMatch({
completable.complete(it)
}, {
completable.completeExceptionally(it)
})
}
}
internal fun <V> Future<V>.get(timeout: Duration? = null): V = if (timeout == null) get() else get(timeout.toNanos(), TimeUnit.NANOSECONDS)

View File

@ -1,6 +1,6 @@
package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UpgradedContract
@ -233,11 +233,11 @@ interface CordaRPCOps : RPCOps {
fun currentNodeTime(): Instant
/**
* Returns a [ListenableFuture] which completes when the node has registered wih the network map service. It can also
* Returns a [CordaFuture] which completes when the node has registered wih the network map service. It can also
* complete with an exception if it is unable to.
*/
@RPCReturnsObservables
fun waitUntilRegisteredWithNetworkMap(): ListenableFuture<Unit>
fun waitUntilRegisteredWithNetworkMap(): CordaFuture<Void?>
// TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of
// the node's state locally and query that directly.

View File

@ -1,6 +1,6 @@
package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.CordaSerializable
import rx.Observable
@ -9,11 +9,11 @@ import rx.Observable
* [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
*
* @property id The started state machine's ID.
* @property returnValue A [ListenableFuture] of the flow's return value.
* @property returnValue A [CordaFuture] of the flow's return value.
*/
interface FlowHandle<A> : AutoCloseable {
val id: StateMachineRunId
val returnValue: ListenableFuture<A>
val returnValue: CordaFuture<A>
/**
* Use this function for flows whose returnValue is not going to be used, so as to free up server resources.
@ -41,7 +41,7 @@ interface FlowProgressHandle<A> : FlowHandle<A> {
@CordaSerializable
data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
override val returnValue: CordaFuture<A>) : FlowHandle<A> {
// Remember to add @Throws to FlowHandle.close() if this throws an exception.
override fun close() {
@ -52,7 +52,7 @@ data class FlowHandleImpl<A>(
@CordaSerializable
data class FlowProgressHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val returnValue: CordaFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
// Remember to add @Throws to FlowProgressHandle.close() if this throws an exception.

View File

@ -1,6 +1,6 @@
package net.corda.core.node.services
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Contract
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
@ -44,7 +44,7 @@ interface NetworkMapCache {
/** Tracks changes to the network map cache */
val changed: Observable<MapChange>
/** Future to track completion of the NetworkMapService registration. */
val mapServiceRegistered: ListenableFuture<Unit>
val mapServiceRegistered: CordaFuture<Void?>
/**
* Atomically get the current party nodes and a stream of updates. Note that the Observable buffers updates until the

View File

@ -1,6 +1,6 @@
package net.corda.core.node.services
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.transactions.LedgerTransaction
/**
@ -11,5 +11,5 @@ interface TransactionVerifierService {
* @param transaction The transaction to be verified.
* @return A future that completes successfully if the transaction verified, or sets an exception the verifier threw.
*/
fun verify(transaction: LedgerTransaction): ListenableFuture<*>
fun verify(transaction: LedgerTransaction): CordaFuture<*>
}

View File

@ -1,7 +1,7 @@
package net.corda.core.node.services
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
@ -190,9 +190,9 @@ interface VaultService {
fun notify(tx: CoreTransaction) = notifyAll(listOf(tx))
/**
* Provide a [Future] for when a [StateRef] is consumed, which can be very useful in building tests.
* Provide a [CordaFuture] for when a [StateRef] is consumed, which can be very useful in building tests.
*/
fun whenConsumed(ref: StateRef): ListenableFuture<Vault.Update<ContractState>> {
fun whenConsumed(ref: StateRef): CordaFuture<Vault.Update<ContractState>> {
return updates.filter { it.consumed.any { it.ref == ref } }.toFuture()
}

View File

@ -3,13 +3,13 @@ package net.corda.core.transactions
import net.corda.core.contracts.*
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
import java.security.PublicKey
import java.security.SignatureException
import java.util.*

View File

@ -1,9 +1,12 @@
package net.corda.core.utilities
import net.corda.core.internal.concurrent.get
import net.corda.core.serialization.CordaSerializable
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import kotlin.reflect.KProperty
//
@ -93,3 +96,10 @@ class TransientProperty<out T>(private val initialiser: () -> T) {
/** @see NonEmptySet.copyOf */
fun <T> Collection<T>.toNonEmptySet(): NonEmptySet<T> = NonEmptySet.copyOf(this)
/** Same as [Future.get] except that the [ExecutionException] is unwrapped. */
fun <V> Future<V>.getOrThrow(timeout: Duration? = null): V = try {
get(timeout)
} catch (e: ExecutionException) {
throw e.cause!!
}

View File

@ -0,0 +1,100 @@
package net.corda.core.concurrent;
import net.corda.core.internal.concurrent.OpenFuture;
import org.junit.Test;
import java.io.EOFException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static net.corda.core.internal.concurrent.CordaFutureImplKt.doneFuture;
import static net.corda.core.internal.concurrent.CordaFutureImplKt.openFuture;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CordaFutureInJavaTest {
@Test
public void methodsAreNotTooAwkwardToUse() throws InterruptedException, ExecutionException {
{
CordaFuture<Number> f = openFuture();
f.cancel(false);
assertTrue(f.isCancelled());
}
{
CordaFuture<Number> f = openFuture();
assertThatThrownBy(() -> f.get(1, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
}
{
CordaFuture<Number> f = doneFuture(100);
assertEquals(100, f.get());
}
{
Future<Integer> f = doneFuture(100);
assertEquals(Integer.valueOf(100), f.get());
}
{
OpenFuture<Number> f = openFuture();
OpenFuture<Number> g = openFuture();
f.then(done -> {
try {
return g.set(done.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
f.set(100);
assertEquals(100, g.get());
}
}
@Test
public void toCompletableFutureWorks() throws InterruptedException, ExecutionException {
{
OpenFuture<Number> f = openFuture();
CompletableFuture<Number> g = f.toCompletableFuture();
f.set(100);
assertEquals(100, g.get());
}
{
OpenFuture<Number> f = openFuture();
CompletableFuture<Number> g = f.toCompletableFuture();
EOFException e = new EOFException();
f.setException(e);
assertThatThrownBy(g::get).hasCause(e);
}
{
OpenFuture<Number> f = openFuture();
CompletableFuture<Number> g = f.toCompletableFuture();
f.cancel(false);
assertTrue(g.isCancelled());
}
}
@Test
public void toCompletableFutureDoesNotHaveThePowerToAffectTheUnderlyingFuture() {
{
OpenFuture<Number> f = openFuture();
CompletableFuture<Number> g = f.toCompletableFuture();
g.complete(100);
assertFalse(f.isDone());
}
{
OpenFuture<Number> f = openFuture();
CompletableFuture<Number> g = f.toCompletableFuture();
g.completeExceptionally(new EOFException());
assertFalse(f.isDone());
}
{
OpenFuture<Number> f = openFuture();
CompletableFuture<Number> g = f.toCompletableFuture();
g.cancel(false);
// For now let's do the most conservative thing i.e. nothing:
assertFalse(f.isDone());
}
}
}

View File

@ -1,18 +1,11 @@
package net.corda.core
import com.google.common.util.concurrent.MoreExecutors
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.same
import com.nhaarman.mockito_kotlin.verify
import net.corda.core.utilities.getOrThrow
import org.assertj.core.api.Assertions.*
import org.junit.Test
import org.mockito.ArgumentMatchers.anyString
import org.slf4j.Logger
import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.CancellationException
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
class UtilsTest {
@Test
@ -65,17 +58,4 @@ class UtilsTest {
future.get()
}
}
@Test
fun `andForget works`() {
val log = mock<Logger>()
val throwable = Exception("Boom")
val executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
executor.submit { throw throwable }.andForget(log)
executor.shutdown()
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
// Do nothing.
}
verify(log).error(anyString(), same(throwable))
}
}

View File

@ -1,21 +1,22 @@
package net.corda.core.concurrent
import com.google.common.util.concurrent.SettableFuture
import com.nhaarman.mockito_kotlin.*
import net.corda.core.getOrThrow
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.utilities.getOrThrow
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import org.slf4j.Logger
import java.io.EOFException
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletableFuture
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class ConcurrencyUtilsTest {
private val f1 = SettableFuture.create<Int>()
private val f2 = SettableFuture.create<Double>()
private val f1 = openFuture<Int>()
private val f2 = openFuture<Double>()
private var invocations = 0
private val log: Logger = mock<Logger>()
private val log = mock<Logger>()
@Test
fun `firstOf short circuit`() {
// Order not significant in this case:
@ -31,6 +32,7 @@ class ConcurrencyUtilsTest {
f2.setException(throwable)
assertEquals(1, invocations) // Least astonishing to skip handler side-effects.
verify(log).error(eq(shortCircuitedTaskFailedMessage), same(throwable))
verifyNoMoreInteractions(log)
}
@Test
@ -48,20 +50,24 @@ class ConcurrencyUtilsTest {
assertTrue(f2.isCancelled)
}
/**
* Note that if you set CancellationException on CompletableFuture it will report isCancelled.
*/
@Test
fun `firstOf re-entrant handler attempt not due to cancel`() {
val futures = arrayOf(f1, f2)
val fakeCancel = CancellationException()
val nonCancel = IllegalStateException()
val g = firstOf(futures, log) {
++invocations
futures.forEach { it.setException(fakeCancel) } // One handler attempt here.
futures.forEach { it.setException(nonCancel) } // One handler attempt here.
it.getOrThrow()
}
f1.set(100)
assertEquals(100, g.getOrThrow())
assertEquals(1, invocations) // Handler didn't run as g was already done.
verify(log).error(eq(shortCircuitedTaskFailedMessage), same(fakeCancel))
assertThatThrownBy { f2.getOrThrow() }.isSameAs(fakeCancel)
verify(log).error(eq(shortCircuitedTaskFailedMessage), same(nonCancel))
verifyNoMoreInteractions(log)
assertThatThrownBy { f2.getOrThrow() }.isSameAs(nonCancel)
}
@Test
@ -75,4 +81,37 @@ class ConcurrencyUtilsTest {
assertEquals(1, invocations)
verifyNoMoreInteractions(log)
}
@Test
fun `match does not pass failure of success block into the failure block`() {
val f = CompletableFuture.completedFuture(100)
val successes = mutableListOf<Any?>()
val failures = mutableListOf<Any?>()
val x = Throwable()
assertThatThrownBy {
f.match({
successes.add(it)
throw x
}, failures::add)
}.isSameAs(x)
assertEquals(listOf<Any?>(100), successes)
assertEquals(emptyList<Any?>(), failures)
}
@Test
fun `match does not pass ExecutionException to failure block`() {
val e = Throwable()
val f = CompletableFuture<Void>().apply { completeExceptionally(e) }
val successes = mutableListOf<Any?>()
val failures = mutableListOf<Any?>()
val x = Throwable()
assertThatThrownBy {
f.match(successes::add, {
failures.add(it)
throw x
})
}.isSameAs(x)
assertEquals(emptyList<Any?>(), successes)
assertEquals(listOf<Any?>(e), failures)
}
}

View File

@ -4,12 +4,12 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.getOrThrow
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.internal.FetchAttachmentsFlow
import net.corda.core.internal.FetchDataFlow
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.database.RequeryConfiguration
import net.corda.node.services.network.NetworkMapService

View File

@ -3,13 +3,13 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Command
import net.corda.core.contracts.requireThat
import net.corda.core.getOrThrow
import net.corda.testing.contracts.DummyContract
import net.corda.core.identity.Party
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.unwrap
import net.corda.testing.MINI_CORP_KEY
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockServices
import org.junit.After

View File

@ -4,7 +4,6 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.getOrThrow
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
@ -14,6 +13,7 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.internal.Emoji
import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashIssueFlow
import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.services.startFlowPermission

View File

@ -4,9 +4,9 @@ import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.GBP
import net.corda.core.contracts.Issued
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockServices
import org.junit.After

View File

@ -4,9 +4,9 @@ import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.GBP
import net.corda.core.contracts.Issued
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockServices
import org.junit.After

View File

@ -1,9 +1,9 @@
package net.corda.core.flows
import net.corda.core.getOrThrow
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousPartyAndPath
import net.corda.core.identity.Party
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.DUMMY_NOTARY

View File

@ -6,9 +6,9 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.TestDataVendingFlow
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.sequence
import net.corda.testing.DUMMY_NOTARY_KEY
import net.corda.testing.MEGA_CORP

View File

@ -0,0 +1,163 @@
package net.corda.core.internal.concurrent
import com.nhaarman.mockito_kotlin.*
import net.corda.core.concurrent.CordaFuture
import net.corda.core.utilities.getOrThrow
import org.assertj.core.api.Assertions
import org.junit.Test
import org.slf4j.Logger
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class CordaFutureTest {
@Test
fun `fork works`() {
val e = Executors.newSingleThreadExecutor()
try {
assertEquals(100, e.fork { 100 }.getOrThrow())
val x = Exception()
val f = e.fork { throw x }
Assertions.assertThatThrownBy { f.getOrThrow() }.isSameAs(x)
} finally {
e.shutdown()
}
}
@Test
fun `if a listener fails its throwable is logged`() {
val f = CordaFutureImpl<Int>()
val x = Exception()
val log = mock<Logger>()
val flag = AtomicBoolean()
f.thenImpl(log) { throw x }
f.thenImpl(log) { flag.set(true) } // Must not be affected by failure of previous listener.
f.set(100)
verify(log).error(eq(CordaFutureImpl.listenerFailedMessage), same(x))
verifyNoMoreInteractions(log)
assertTrue(flag.get())
}
@Test
fun `map works`() {
run {
val f = CordaFutureImpl<Int>()
val g = f.map { it * 2 }
f.set(100)
assertEquals(200, g.getOrThrow())
}
run {
val f = CordaFutureImpl<Int>()
val x = Exception()
val g = f.map { throw x }
f.set(100)
Assertions.assertThatThrownBy { g.getOrThrow() }.isSameAs(x)
}
run {
val block = mock<(Any?) -> Any?>()
val f = CordaFutureImpl<Int>()
val g = f.map(block)
val x = Exception()
f.setException(x)
Assertions.assertThatThrownBy { g.getOrThrow() }.isSameAs(x)
verifyNoMoreInteractions(block)
}
}
@Test
fun `flatMap works`() {
run {
val f = CordaFutureImpl<Int>()
val g = f.flatMap { CordaFutureImpl<Int>().apply { set(it * 2) } }
f.set(100)
assertEquals(200, g.getOrThrow())
}
run {
val f = CordaFutureImpl<Int>()
val x = Exception()
val g = f.flatMap { CordaFutureImpl<Void>().apply { setException(x) } }
f.set(100)
Assertions.assertThatThrownBy { g.getOrThrow() }.isSameAs(x)
}
run {
val f = CordaFutureImpl<Int>()
val x = Exception()
val g: CordaFuture<Void> = f.flatMap { throw x }
f.set(100)
Assertions.assertThatThrownBy { g.getOrThrow() }.isSameAs(x)
}
run {
val block = mock<(Any?) -> CordaFuture<*>>()
val f = CordaFutureImpl<Int>()
val g = f.flatMap(block)
val x = Exception()
f.setException(x)
Assertions.assertThatThrownBy { g.getOrThrow() }.isSameAs(x)
verifyNoMoreInteractions(block)
}
}
@Test
fun `andForget works`() {
val log = mock<Logger>()
val throwable = Exception("Boom")
val executor = Executors.newSingleThreadExecutor()
executor.fork { throw throwable }.andForget(log)
executor.shutdown()
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
// Do nothing.
}
verify(log).error(any(), same(throwable))
}
}
class TransposeTest {
private val a = openFuture<Int>()
private val b = openFuture<Int>()
private val c = openFuture<Int>()
private val f = listOf(a, b, c).transpose()
@Test
fun `transpose empty collection`() {
assertEquals(emptyList(), emptyList<CordaFuture<*>>().transpose().getOrThrow())
}
@Test
fun `transpose values are in the same order as the collection of futures`() {
b.set(2)
c.set(3)
assertFalse(f.isDone)
a.set(1)
assertEquals(listOf(1, 2, 3), f.getOrThrow())
}
@Test
fun `transpose throwables are reported in the order they were thrown`() {
val ax = Exception()
val bx = Exception()
val cx = Exception()
b.setException(bx)
c.setException(cx)
assertFalse(f.isDone)
a.setException(ax)
Assertions.assertThatThrownBy { f.getOrThrow() }.isSameAs(bx)
assertEquals(listOf(cx, ax), bx.suppressed.asList())
assertEquals(emptyList(), ax.suppressed.asList())
assertEquals(emptyList(), cx.suppressed.asList())
}
@Test
fun `transpose mixture of outcomes`() {
val bx = Exception()
val cx = Exception()
b.setException(bx)
c.setException(cx)
assertFalse(f.isDone)
a.set(100) // Discarded.
Assertions.assertThatThrownBy { f.getOrThrow() }.isSameAs(bx)
assertEquals(listOf(cx), bx.suppressed.asList())
assertEquals(emptyList(), cx.suppressed.asList())
}
}

View File

@ -5,7 +5,6 @@ import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.internal.FetchAttachmentsFlow
import net.corda.core.internal.FetchDataFlow
@ -13,13 +12,13 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
import net.corda.node.services.statemachine.SessionInit
import net.corda.core.flows.DataVendingFlow
import net.corda.core.flows.TestDataVendingFlow
import net.corda.testing.node.MockNetwork
import org.junit.After

View File

@ -1,25 +1,22 @@
package net.corda.docs
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.ContractState
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.DOLLARS
import net.corda.core.getOrThrow
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultTrackBy
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.OpaqueBytes
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.DUMMY_NOTARY
import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.nodeapi.User
import net.corda.testing.*
import net.corda.testing.driver.driver
import net.corda.testing.expect
import net.corda.testing.expectEvents
@ -41,11 +38,11 @@ class IntegrationTestingTutorial {
val bobUser = User("bobUser", "testPassword2", permissions = setOf(
startFlowPermission<CashPaymentFlow>()
))
val (alice, bob, notary) = Futures.allAsList(
val (alice, bob, notary) = listOf(
startNode(ALICE.name, rpcUsers = listOf(aliceUser)),
startNode(BOB.name, rpcUsers = listOf(bobUser)),
startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type)))
).getOrThrow()
).transpose().getOrThrow()
// END 1
// START 2
@ -66,7 +63,7 @@ class IntegrationTestingTutorial {
// START 4
val issueRef = OpaqueBytes.of(0)
val futures = Stack<ListenableFuture<*>>()
val futures = Stack<CordaFuture<*>>()
(1..10).map { i ->
thread {
futures.push(aliceProxy.startFlow(::CashIssueFlow,

View File

@ -2,9 +2,9 @@ package net.corda.docs
import net.corda.contracts.getCashBalances
import net.corda.core.contracts.*
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashIssueFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService

View File

@ -2,10 +2,10 @@ package net.corda.docs
import net.corda.contracts.getCashBalances
import net.corda.core.contracts.*
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.toFuture
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashIssueFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService

View File

@ -3,13 +3,13 @@ package net.corda.docs
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.getOrThrow
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.toFuture
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_NOTARY_KEY
import net.corda.node.services.network.NetworkMapService

View File

@ -3,9 +3,9 @@ package net.corda.flows
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.`issued by`
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode

View File

@ -3,9 +3,9 @@ package net.corda.flows
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.`issued by`
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode

View File

@ -3,12 +3,12 @@ package net.corda.flows
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.`issued by`
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.Vault
import net.corda.core.node.services.trackBy
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin

View File

@ -1,20 +1,20 @@
package net.corda.flows
import com.google.common.util.concurrent.ListenableFuture
import net.corda.contracts.asset.Cash
import net.corda.core.concurrent.CordaFuture
import net.corda.testing.contracts.calculateRandomlySizedAmounts
import net.corda.core.contracts.Amount
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.currency
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.Vault
import net.corda.core.node.services.trackBy
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.testing.contracts.calculateRandomlySizedAmounts
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.node.MockNetwork
@ -159,7 +159,7 @@ class IssuerFlowTest(val anonymous: Boolean) {
amount: Amount<Currency>,
issueToParty: Party,
ref: OpaqueBytes,
notaryParty: Party): ListenableFuture<AbstractCashFlow.Result> {
notaryParty: Party): CordaFuture<AbstractCashFlow.Result> {
val issueToPartyAndRef = issueToParty.ref(ref)
val issueRequest = IssuanceRequester(amount, issueToParty, issueToPartyAndRef.reference, issuerNode.info.legalIdentity, notaryParty,
anonymous)

View File

@ -162,7 +162,7 @@ object RPCApi {
data class Observation(
val id: ObservableId,
val content: Notification<Any>
val content: Notification<*>
) : ServerToClient() {
override fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
message.putIntProperty(TAG_FIELD_NAME, Tag.OBSERVATION.ordinal)

View File

@ -4,7 +4,7 @@ package net.corda.nodeapi
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.CordaRuntimeException
import net.corda.core.serialization.*
import net.corda.core.toFuture
@ -45,16 +45,15 @@ class PermissionException(msg: String) : RuntimeException(msg)
// The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly.
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
// because we can see everything we're using in one place.
class RPCKryo(observableSerializer: Serializer<Observable<Any>>, whitelist: ClassWhitelist) : CordaKryo(CordaClassResolver(whitelist)) {
class RPCKryo(observableSerializer: Serializer<Observable<*>>, whitelist: ClassWhitelist) : CordaKryo(CordaClassResolver(whitelist)) {
init {
DefaultKryoCustomizer.customize(this)
// RPC specific classes
register(InputStream::class.java, InputStreamSerializer)
register(Observable::class.java, observableSerializer)
@Suppress("UNCHECKED_CAST")
register(ListenableFuture::class,
read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java as Class<Observable<Any>>).toFuture() },
register(CordaFuture::class,
read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() },
write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) }
)
}
@ -66,8 +65,8 @@ class RPCKryo(observableSerializer: Serializer<Observable<Any>>, whitelist: Clas
if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) {
return super.getRegistration(InputStream::class.java)
}
if (ListenableFuture::class.java != type && ListenableFuture::class.java.isAssignableFrom(type)) {
return super.getRegistration(ListenableFuture::class.java)
if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) {
return super.getRegistration(CordaFuture::class.java)
}
type.requireExternal("RPC not allowed to deserialise internal classes")
return super.getRegistration(type)

View File

@ -4,10 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.internal.div
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.node.internal.NodeStartup
import net.corda.node.services.startFlowPermission

View File

@ -1,14 +1,14 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.Futures
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.core.utilities.unwrap
@ -24,9 +24,9 @@ class CordappScanningDriverTest {
val user = User("u", "p", setOf(startFlowPermission<ReceiveFlow>()))
// The driver will automatically pick up the annotated flows below
driver {
val (alice, bob) = Futures.allAsList(
val (alice, bob) = listOf(
startNode(ALICE.name, rpcUsers = listOf(user)),
startNode(BOB.name)).getOrThrow()
startNode(BOB.name)).transpose().getOrThrow()
val initiatedFlowClass = alice.rpcClientToNode()
.start(user.username, user.password)
.proxy

View File

@ -2,10 +2,10 @@ package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import com.google.common.base.Stopwatch
import com.google.common.util.concurrent.Futures
import net.corda.core.contracts.DOLLARS
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.minutes
import net.corda.core.node.services.ServiceInfo
@ -113,7 +113,7 @@ class NodePerformanceTests {
val doneFutures = (1..100).toList().parallelStream().map {
connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), a.nodeInfo.legalIdentity, a.nodeInfo.notaryIdentity).returnValue
}.toList()
Futures.allAsList(doneFutures).get()
doneFutures.transpose().get()
println("STARTING PAYMENT")
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 100L / TimeUnit.SECONDS) {
connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, a.nodeInfo.legalIdentity).returnValue.get()

View File

@ -9,12 +9,12 @@ import net.corda.core.internal.div
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.AbstractNode
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.network.NetworkMapService

View File

@ -2,7 +2,6 @@ package net.corda.node.services
import net.corda.core.contracts.Amount
import net.corda.core.contracts.POUNDS
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.messaging.CordaRPCOps
@ -10,6 +9,9 @@ import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.DUMMY_NOTARY
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.services.transactions.RaftValidatingNotaryService

View File

@ -1,18 +1,18 @@
package net.corda.node.services
import com.google.common.util.concurrent.Futures
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.testing.contracts.DummyContract
import net.corda.core.identity.Party
import net.corda.testing.DUMMY_BANK_A
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.map
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.internal.AbstractNode
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.NodeBasedTest
import org.bouncycastle.asn1.x500.X500Name
import org.junit.Test
@ -25,10 +25,10 @@ class RaftNotaryServiceTests : NodeBasedTest() {
@Test
fun `detect double spend`() {
val (bankA) = Futures.allAsList(
val (bankA) = listOf(
startNode(DUMMY_BANK_A.name),
startNotaryCluster(notaryName, 3).map { it.first() }
).getOrThrow()
).transpose().getOrThrow()
val notaryParty = bankA.services.networkMapCache.getNotary(notaryName)!!

View File

@ -1,11 +1,11 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.Futures
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.core.utilities.unwrap
@ -16,9 +16,9 @@ import org.junit.Test
class FlowVersioningTest : NodeBasedTest() {
@Test
fun `core flows receive platform version of initiator`() {
val (alice, bob) = Futures.allAsList(
val (alice, bob) = listOf(
startNode(ALICE.name, platformVersion = 2),
startNode(BOB.name, platformVersion = 3)).getOrThrow()
startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow()
bob.installCoreFlow(ClientFlow::class, ::SendBackPlatformVersionFlow)
val resultFuture = alice.services.startFlow(ClientFlow(bob.info.legalIdentity)).resultFuture
assertThat(resultFuture.getOrThrow()).isEqualTo(2)

View File

@ -7,11 +7,11 @@ import net.corda.core.crypto.toBase58String
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.crypto.random63BitValue
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.core.utilities.unwrap

View File

@ -1,9 +1,8 @@
package net.corda.services.messaging
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.getOrThrow
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.elapsedTime
import net.corda.core.internal.times
import net.corda.core.messaging.MessageRecipients
@ -12,6 +11,7 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.Node
import net.corda.node.services.api.DEFAULT_SESSION_ID
@ -39,7 +39,7 @@ class P2PMessagingTest : NodeBasedTest() {
@Test
fun `network map will work after restart`() {
val identities = listOf(DUMMY_BANK_A, DUMMY_BANK_B, DUMMY_NOTARY)
fun startNodes() = Futures.allAsList(identities.map { startNode(it.name) })
fun startNodes() = identities.map { startNode(it.name) }.transpose()
val startUpDuration = elapsedTime { startNodes().getOrThrow() }
// Start the network map a second time - this will restore message queues from the journal.
@ -75,7 +75,7 @@ class P2PMessagingTest : NodeBasedTest() {
DUMMY_MAP.name,
advertisedServices = setOf(distributedService),
configOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val (serviceNode2, alice) = Futures.allAsList(
val (serviceNode2, alice) = listOf(
startNode(
SERVICE_2_NAME,
advertisedServices = setOf(distributedService),
@ -83,7 +83,7 @@ class P2PMessagingTest : NodeBasedTest() {
"notaryNodeAddress" to freeLocalHostAndPort().toString(),
"notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))),
startNode(ALICE.name)
).getOrThrow()
).transpose().getOrThrow()
assertAllNodesAreUsed(listOf(networkMapNode, serviceNode2), DISTRIBUTED_SERVICE_NAME, alice)
}
@ -217,7 +217,7 @@ class P2PMessagingTest : NodeBasedTest() {
}
}
private fun Node.receiveFrom(target: MessageRecipients): ListenableFuture<Any> {
private fun Node.receiveFrom(target: MessageRecipients): CordaFuture<Any> {
val request = TestRequest(replyTo = network.myAddress)
return network.sendRequest<Any>(javaClass.name, request, target)
}

View File

@ -1,13 +1,13 @@
package net.corda.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.cert
import net.corda.core.crypto.random63BitValue
import net.corda.core.getOrThrow
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.seconds
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.sendRequest
@ -65,7 +65,7 @@ class P2PSecurityTest : NodeBasedTest() {
return SimpleNode(config, trustRoot = trustRoot).apply { start() }
}
private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): ListenableFuture<NetworkMapService.RegistrationResponse> {
private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): CordaFuture<NetworkMapService.RegistrationResponse> {
val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public)
val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1)
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)

View File

@ -3,17 +3,17 @@ package net.corda.node.internal
import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.*
import net.corda.core.flatMap
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
@ -139,9 +139,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
var isPreviousCheckpointsPresent = false
private set
protected val _networkMapRegistrationFuture: SettableFuture<Unit> = SettableFuture.create()
protected val _networkMapRegistrationFuture = openFuture<Unit>()
/** Completes once the node has successfully registered with the network map service */
val networkMapRegistrationFuture: ListenableFuture<Unit>
val networkMapRegistrationFuture: CordaFuture<Unit>
get() = _networkMapRegistrationFuture
/** Fetch CordaPluginRegistry classes registered in META-INF/services/net.corda.core.node.CordaPluginRegistry files that exist in the classpath */
@ -212,7 +212,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
registerInitiatedFlow(IssuerFlow.Issuer::class.java)
runOnStop += network::stop
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
_networkMapRegistrationFuture.captureLater(registerWithNetworkMapIfConfigured())
smm.start()
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
@ -575,7 +575,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
}
private fun registerWithNetworkMapIfConfigured(): ListenableFuture<Unit> {
private fun registerWithNetworkMapIfConfigured(): CordaFuture<Unit> {
services.networkMapCache.addNode(info)
// In the unit test environment, we may sometimes run without any network map service
return if (networkMapAddress == null && inNodeNetworkMapService == null) {
@ -590,7 +590,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
* Register this node with the network map cache, and load network map from a remote service (and register for
* updates) if one has been supplied.
*/
protected open fun registerWithNetworkMap(): ListenableFuture<Unit> {
protected open fun registerWithNetworkMap(): CordaFuture<Unit> {
require(networkMapAddress != null || NetworkMapService.type in advertisedServices.map { it.type }) {
"Initial network map address must indicate a node that provides a network map service"
}
@ -604,7 +604,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
}
private fun sendNetworkMapRegistration(networkMapAddress: SingleMessageRecipient): ListenableFuture<RegistrationResponse> {
private fun sendNetworkMapRegistration(networkMapAddress: SingleMessageRecipient): CordaFuture<RegistrationResponse> {
// Register this node against the network
val instant = platformClock.instant()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
@ -618,7 +618,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected abstract fun myAddresses(): List<NetworkHostAndPort>
/** This is overriden by the mock node implementation to enable operation without any network map service */
protected open fun noNetworkMapConfigured(): ListenableFuture<Unit> {
protected open fun noNetworkMapConfigured(): CordaFuture<Unit> {
// TODO: There should be a consistent approach to configuration error exceptions.
throw IllegalStateException("Configuration error: this node isn't being asked to act as the network map, nor " +
"has any other map node been configured.")

View File

@ -1,15 +1,15 @@
package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.flatMap
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.messaging.RPCOps
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.thenMatch
import net.corda.core.utilities.*
import net.corda.node.VersionInfo
import net.corda.node.serialization.KryoServerSerializationScheme
@ -252,8 +252,8 @@ open class Node(override val configuration: FullNodeConfiguration,
* Insert an initial step in the registration process which will throw an exception if a non-recoverable error is
* encountered when trying to connect to the network map node.
*/
override fun registerWithNetworkMap(): ListenableFuture<Unit> {
val networkMapConnection = messageBroker?.networkMapConnectionFuture ?: Futures.immediateFuture(Unit)
override fun registerWithNetworkMap(): CordaFuture<Unit> {
val networkMapConnection = messageBroker?.networkMapConnectionFuture ?: doneFuture(Unit)
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
@ -292,7 +292,8 @@ open class Node(override val configuration: FullNodeConfiguration,
super.initialiseDatabasePersistence(insideTransaction)
}
val startupComplete: ListenableFuture<Unit> = SettableFuture.create()
private val _startupComplete = openFuture<Unit>()
val startupComplete: CordaFuture<Unit> get() = _startupComplete
override fun start() {
if (initialiseSerialization) {
@ -320,7 +321,7 @@ open class Node(override val configuration: FullNodeConfiguration,
build().
start()
(startupComplete as SettableFuture<Unit>).set(Unit)
_startupComplete.set(Unit)
}
}, {})
shutdownHook = addShutdownHook {

View File

@ -5,10 +5,11 @@ import com.typesafe.config.ConfigException
import joptsimple.OptionException
import net.corda.core.crypto.commonName
import net.corda.core.crypto.orgName
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.*
import net.corda.core.node.services.ServiceInfo
import net.corda.core.then
import net.corda.core.thenMatch
import net.corda.core.utilities.loggerFor
import net.corda.node.*
import net.corda.node.serialization.NodeClock

View File

@ -1,7 +1,7 @@
package net.corda.node.services.api
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
@ -36,7 +36,7 @@ interface NetworkMapCacheInternal : NetworkMapCache {
* @param network the network messaging service.
* @param service the network map service to fetch current state from.
*/
fun deregisterForUpdates(network: MessagingService, service: NodeInfo): ListenableFuture<Unit>
fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture<Unit>
/**
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
@ -48,7 +48,7 @@ interface NetworkMapCacheInternal : NetworkMapCache {
* version is less than or equal to the given version, no update is fetched.
*/
fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient,
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
subscribe: Boolean, ifChangedSinceVer: Int? = null): CordaFuture<Unit>
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)

View File

@ -9,7 +9,6 @@ import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.then
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.api.SchedulerService

View File

@ -1,12 +1,13 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import io.netty.handler.ssl.SslHandler
import net.corda.core.*
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.*
import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_TLS
import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.div
import net.corda.core.internal.noneOrSingle
@ -107,12 +108,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
private val _networkMapConnectionFuture = config.networkMapService?.let { SettableFuture.create<Unit>() }
private val _networkMapConnectionFuture = config.networkMapService?.let { openFuture<Unit>() }
/**
* A [ListenableFuture] which completes when the server successfully connects to the network map node. If a
* non-recoverable error is encountered then the Future will complete with an exception.
*/
val networkMapConnectionFuture: SettableFuture<Unit>? get() = _networkMapConnectionFuture
val networkMapConnectionFuture: CordaFuture<Unit>? get() = _networkMapConnectionFuture
private var networkChangeHandle: Subscription? = null
private val nodeRunsNetworkMapService = config.networkMapService == null

View File

@ -1,8 +1,8 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.catch
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.PartyInfo
@ -147,13 +147,13 @@ inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossin
}
/**
* Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
* Returns a [CordaFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
* The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future.
*/
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture<M> {
val messageFuture = SettableFuture.create<M>()
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): CordaFuture<M> {
val messageFuture = openFuture<M>()
runOnNextMessage(topic, sessionId) { message ->
messageFuture.catch {
messageFuture.capture {
@Suppress("UNCHECKED_CAST")
message.data.deserialize<Any>() as M
}

View File

@ -1,9 +1,9 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.andForget
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.getOrThrow
import net.corda.core.internal.concurrent.andForget
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.ThreadBox
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
@ -11,12 +11,8 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.thenMatch
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.sequence
import net.corda.core.utilities.trace
import net.corda.core.utilities.*
import net.corda.node.VersionInfo
import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MonitoringService
@ -77,7 +73,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val myIdentity: PublicKey?,
val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: CordaPersistence,
val networkMapRegistrationFuture: ListenableFuture<Unit>,
val networkMapRegistrationFuture: CordaFuture<Unit>,
val monitoringService: MonitoringService,
advertisedAddress: NetworkHostAndPort = serverAddress
) : ArtemisMessagingComponent(), MessagingService {

View File

@ -406,7 +406,7 @@ class ObservableContext(
}
}
object RpcServerObservableSerializer : Serializer<Observable<Any>>() {
object RpcServerObservableSerializer : Serializer<Observable<*>>() {
private object RpcObservableContextKey
private val log = loggerFor<RpcServerObservableSerializer>()
@ -414,11 +414,11 @@ object RpcServerObservableSerializer : Serializer<Observable<Any>>() {
return RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
}
override fun read(kryo: Kryo?, input: Input?, type: Class<Observable<Any>>?): Observable<Any> {
override fun read(kryo: Kryo?, input: Input?, type: Class<Observable<*>>?): Observable<Any> {
throw UnsupportedOperationException()
}
override fun write(kryo: Kryo, output: Output, observable: Observable<Any>) {
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
val observableId = RPCApi.ObservableId(random63BitValue())
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
output.writeLong(observableId.toLong, true)
@ -426,8 +426,8 @@ object RpcServerObservableSerializer : Serializer<Observable<Any>>() {
// We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing
// must be done again within the subscriber
subscription = observable.materialize().subscribe(
object : Subscriber<Notification<Any>>() {
override fun onNext(observation: Notification<Any>) {
object : Subscriber<Notification<*>>() {
override fun onNext(observation: Notification<*>) {
if (!isUnsubscribed) {
observableContext.observationSendExecutor.submit {
observableContext.sendMessage(RPCApi.ServerToClient.Observation(observableId, observation))

View File

@ -1,6 +1,6 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.serialization.CordaSerializable
@ -16,12 +16,12 @@ interface ServiceRequestMessage {
}
/**
* Sends a [ServiceRequestMessage] to [target] and returns a [ListenableFuture] of the response.
* Sends a [ServiceRequestMessage] to [target] and returns a [CordaFuture] of the response.
* @param R The type of the response.
*/
fun <R : Any> MessagingService.sendRequest(topic: String,
request: ServiceRequestMessage,
target: MessageRecipients): ListenableFuture<R> {
target: MessageRecipients): CordaFuture<R> {
val responseFuture = onNext<R>(topic, request.sessionID)
send(topic, DEFAULT_SESSION_ID, request, target)
return responseFuture

View File

@ -1,12 +1,12 @@
package net.corda.node.services.network
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.map
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
@ -55,8 +55,8 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single
override val changed: Observable<MapChange> = _changed.wrapWithDatabaseTransaction()
private val changePublisher: rx.Observer<MapChange> get() = _changed.bufferUntilDatabaseCommit()
private val _registrationFuture = SettableFuture.create<Unit>()
override val mapServiceRegistered: ListenableFuture<Unit> get() = _registrationFuture
private val _registrationFuture = openFuture<Void?>()
override val mapServiceRegistered: CordaFuture<Void?> get() = _registrationFuture
private var registeredForPush = false
protected var registeredNodes: MutableMap<PublicKey, NodeInfo> = Collections.synchronizedMap(HashMap())
@ -96,7 +96,7 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single
}
override fun addMapService(network: MessagingService, networkMapAddress: SingleMessageRecipient, subscribe: Boolean,
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
ifChangedSinceVer: Int?): CordaFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
network.addMessageHandler(NetworkMapService.PUSH_TOPIC, DEFAULT_SESSION_ID) { message, _ ->
@ -122,7 +122,7 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single
nodes?.forEach { processRegistration(it) }
Unit
}
_registrationFuture.setFuture(future)
_registrationFuture.captureLater(future.map { null })
return future
}
@ -149,7 +149,7 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single
* Unsubscribes from updates from the given map service.
* @param service the network map service to listen to updates from.
*/
override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
override fun deregisterForUpdates(network: MessagingService, service: NodeInfo): CordaFuture<Unit> {
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.SubscribeRequest(false, network.myAddress)
// `network.getAddressOfParty(partyInfo)` is a work-around for MockNetwork and InMemoryMessaging to get rid of SingleMessageRecipient in NodeInfo.
@ -157,7 +157,7 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single
val future = network.sendRequest<SubscribeResponse>(NetworkMapService.SUBSCRIPTION_TOPIC, req, address).map {
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
}
_registrationFuture.setFuture(future)
_registrationFuture.captureLater(future.map { null })
return future
}
@ -181,6 +181,6 @@ open class InMemoryNetworkMapCache(private val serviceHub: ServiceHub?) : Single
@VisibleForTesting
override fun runWithoutMapService() {
_registrationFuture.set(Unit)
_registrationFuture.set(null)
}
}

View File

@ -4,13 +4,14 @@ import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.abbreviate
import net.corda.core.internal.staticField
import net.corda.core.transactions.SignedTransaction
@ -76,10 +77,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
*/
override val logger: Logger = LoggerFactory.getLogger("net.corda.flow.$id")
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
@Transient private var _resultFuture: OpenFuture<R>? = openFuture()
/** This future will complete when the call method returns. */
override val resultFuture: ListenableFuture<R>
get() = _resultFuture ?: SettableFuture.create<R>().also { _resultFuture = it }
override val resultFuture: CordaFuture<R>
get() = _resultFuture ?: openFuture<R>().also { _resultFuture = it }
// This state IS serialised, as we need it to know what the fiber is waiting for.
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>()

View File

@ -6,8 +6,8 @@ import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.KryoException
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowException
@ -22,7 +22,6 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializationDefaults.CHECKPOINT_CONTEXT
import net.corda.core.serialization.SerializationDefaults.SERIALIZATION_FACTORY
import net.corda.core.then
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
@ -144,7 +143,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
fun findServices(predicate: (Any) -> Boolean) = tokenizableServices.filter(predicate)
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, CordaFuture<T>>> {
@Suppress("UNCHECKED_CAST")
return mutex.locked {
stateMachines.keys.mapNotNull {

View File

@ -5,7 +5,6 @@ import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.DigitalSignature
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotaryException
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TimeWindowChecker
@ -13,6 +12,7 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap
import net.corda.node.services.api.ServiceHubInternal

View File

@ -1,7 +1,7 @@
package net.corda.node.services.transactions
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.internal.concurrent.fork
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
@ -10,9 +10,5 @@ import java.util.concurrent.Executors
class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService {
private val workerPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfWorkers))
override fun verify(transaction: LedgerTransaction): ListenableFuture<*> {
return workerPool.submit {
transaction.verify()
}
}
override fun verify(transaction: LedgerTransaction) = workerPool.fork(transaction::verify)
}

View File

@ -2,11 +2,12 @@ package net.corda.node.services.transactions
import com.codahale.metrics.Gauge
import com.codahale.metrics.Timer
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.loggerFor
@ -24,7 +25,7 @@ abstract class OutOfProcessTransactionVerifierService(
private data class VerificationHandle(
val transactionId: SecureHash,
val resultFuture: SettableFuture<Unit>,
val resultFuture: OpenFuture<Unit>,
val durationTimerContext: Timer.Context
)
@ -61,9 +62,9 @@ abstract class OutOfProcessTransactionVerifierService(
abstract fun sendRequest(nonce: Long, transaction: LedgerTransaction)
override fun verify(transaction: LedgerTransaction): ListenableFuture<*> {
override fun verify(transaction: LedgerTransaction): CordaFuture<*> {
log.info("Verifying ${transaction.id}")
val future = SettableFuture.create<Unit>()
val future = openFuture<Unit>()
val nonce = random63BitValue()
verificationHandles[nonce] = VerificationHandle(transaction.id, future, durationTimer.time())
sendRequest(nonce, transaction)

View File

@ -1,13 +1,12 @@
package net.corda.node.shell
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.StateMachineUpdate.Added
import net.corda.core.messaging.StateMachineUpdate.Removed
import net.corda.core.then
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try
import org.crsh.text.Color
@ -22,7 +21,7 @@ import rx.Subscriber
class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Subscriber<Any>() {
private val indexMap = HashMap<StateMachineRunId, Int>()
private val table = createStateMachinesTable()
val future: SettableFuture<Unit> = SettableFuture.create()
val future = openFuture<Unit>()
init {
// The future is public and can be completed by something else to indicate we don't wish to follow

View File

@ -7,16 +7,19 @@ import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.google.common.io.Closeables
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.*
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.write
import net.corda.core.internal.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.then
import net.corda.core.utilities.loggerFor
import net.corda.jackson.JacksonSupport
import net.corda.jackson.StringToMethodCallParser
@ -382,7 +385,7 @@ object InteractiveShell {
return result
}
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): ListenableFuture<Unit>? {
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit>? {
val printerFun = { obj: Any? -> yamlMapper.writeValueAsString(obj) }
toStream.println(printerFun(response))
toStream.flush()
@ -391,7 +394,7 @@ object InteractiveShell {
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
private var count = 0
val future: SettableFuture<Unit> = SettableFuture.create()
val future = openFuture<Unit>()
init {
// The future is public and can be completed by something else to indicate we don't wish to follow
@ -422,7 +425,7 @@ object InteractiveShell {
// Kotlin bug: USELESS_CAST warning is generated below but the IDE won't let us remove it.
@Suppress("USELESS_CAST", "UNCHECKED_CAST")
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): SettableFuture<Unit>? {
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): OpenFuture<Unit>? {
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
// object graphs that contain yet more observables. So we just look for top level responses that follow

View File

@ -4,7 +4,6 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.SettableFuture
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.internal.until
import net.corda.core.then
import rx.Observable
import rx.Subscriber
import rx.subscriptions.Subscriptions
@ -12,7 +11,6 @@ import java.time.Clock
import java.time.Instant
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicLong
import java.util.function.BiConsumer
import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
/**
@ -106,17 +104,11 @@ fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = GuavaSettable
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
* switch. There's no need to checkpoint our Fibers as there's no external effect of waiting.
*/
private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>): SettableFuture<Boolean> {
return if (future is ListenableFuture) {
val settable = SettableFuture<Boolean>()
future.then { settable.set(true) }
settable
} else if (future is CompletableFuture) {
val settable = SettableFuture<Boolean>()
future.whenComplete(BiConsumer { _, _ -> settable.set(true) })
settable
} else {
throw IllegalArgumentException("Cannot make future $future Fiber friendly.")
private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>) = SettableFuture<Boolean>().also { g ->
when (future) {
is ListenableFuture -> future.addListener(Runnable { g.set(true) }, Executor { it.run() })
is CompletionStage<*> -> future.whenComplete { _, _ -> g.set(true) }
else -> throw IllegalArgumentException("Cannot make future $future Fiber friendly.")
}
}

View File

@ -7,13 +7,13 @@ import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.getOrThrow
import net.corda.core.messaging.*
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.node.services.queryBy
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.internal.CordaRPCOpsImpl

View File

@ -1,7 +1,7 @@
package net.corda.node
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Amount
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
@ -92,7 +92,7 @@ class InteractiveShellTest {
get() = throw UnsupportedOperationException()
override val id: StateMachineRunId
get() = throw UnsupportedOperationException()
override val resultFuture: ListenableFuture<Any?>
override val resultFuture: CordaFuture<Any?>
get() = throw UnsupportedOperationException()
override val flowInitiator: FlowInitiator
get() = throw UnsupportedOperationException()

View File

@ -2,6 +2,7 @@ package net.corda.node.messaging
import co.paralleluniverse.fibers.Suspendable
import net.corda.contracts.CommercialPaper
import net.corda.core.concurrent.CordaFuture
import net.corda.contracts.asset.CASH
import net.corda.contracts.asset.Cash
import net.corda.contracts.asset.`issued by`
@ -13,14 +14,13 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.getOrThrow
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.AnonymousPartyAndPath
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.rootCause
import net.corda.core.map
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.StateMachineTransactionMapping
@ -32,6 +32,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.days
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.toNonEmptySet
import net.corda.core.utilities.unwrap
import net.corda.flows.TwoPartyTradeFlow.Buyer
@ -57,7 +58,6 @@ import java.io.ByteArrayOutputStream
import java.math.BigInteger
import java.security.KeyPair
import java.util.*
import java.util.concurrent.Future
import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry
import kotlin.test.assertEquals
@ -505,8 +505,8 @@ class TwoPartyTradeFlowTests {
private data class RunResult(
// The buyer is not created immediately, only when the seller starts running
val buyer: Future<FlowStateMachine<*>>,
val sellerResult: Future<SignedTransaction>,
val buyer: CordaFuture<FlowStateMachine<*>>,
val sellerResult: CordaFuture<SignedTransaction>,
val sellerId: StateMachineRunId
)

View File

@ -4,11 +4,11 @@ import net.corda.core.contracts.*
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.NotaryChangeFlow
import net.corda.core.flows.StateReplacementException
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService

View File

@ -1,10 +1,10 @@
package net.corda.node.services.messaging
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.RPCUserService
@ -31,7 +31,6 @@ import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.net.ServerSocket
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.concurrent.thread
@ -50,7 +49,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
lateinit var config: NodeConfiguration
lateinit var database: CordaPersistence
lateinit var userService: RPCUserService
lateinit var networkMapRegistrationFuture: ListenableFuture<Unit>
lateinit var networkMapRegistrationFuture: CordaFuture<Unit>
var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
@ -71,7 +70,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
myLegalName = ALICE.name)
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
networkMapRegistrationFuture = Futures.immediateFuture(Unit)
networkMapRegistrationFuture = doneFuture(Unit)
}
@After
@ -135,7 +134,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
@Test
fun `client should be able to send message to itself before network map is available, and receive after`() {
val settableFuture: SettableFuture<Unit> = SettableFuture.create()
val settableFuture = openFuture<Unit>()
networkMapRegistrationFuture = settableFuture
val receivedMessages = LinkedBlockingQueue<Message>()
@ -160,7 +159,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
fun `client should be able to send large numbers of messages to itself before network map is available and survive restart, then receive messages`() {
// Crank the iteration up as high as you want... just takes longer to run.
val iterations = 100
networkMapRegistrationFuture = SettableFuture.create()
networkMapRegistrationFuture = openFuture()
val receivedMessages = LinkedBlockingQueue<Message>()
@ -181,7 +180,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
messagingClient.stop()
messagingServer?.stop()
networkMapRegistrationFuture = Futures.immediateFuture(Unit)
networkMapRegistrationFuture = doneFuture(Unit)
createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {

View File

@ -1,11 +1,11 @@
package net.corda.node.services.network
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.getOrThrow
import net.corda.core.concurrent.CordaFuture
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.send
@ -210,7 +210,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
private var lastSerial = Long.MIN_VALUE
private fun MockNode.registration(addOrRemove: AddOrRemove,
serial: Long? = null): ListenableFuture<RegistrationResponse> {
serial: Long? = null): CordaFuture<RegistrationResponse> {
val distinctSerial = if (serial == null) {
++lastSerial
} else {

View File

@ -1,8 +1,8 @@
package net.corda.node.services.network
import net.corda.core.getOrThrow
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.node.MockNetwork

View File

@ -2,17 +2,16 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.random63BitValue
import net.corda.core.flatMap
import net.corda.core.flows.*
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.map
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.ServiceInfo
@ -25,6 +24,7 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.ProgressTracker.Change
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
@ -706,13 +706,13 @@ class FlowFrameworkTests {
return newNode.getSingleFlow<P>().first
}
private inline fun <reified P : FlowLogic<*>> MockNode.getSingleFlow(): Pair<P, ListenableFuture<*>> {
private inline fun <reified P : FlowLogic<*>> MockNode.getSingleFlow(): Pair<P, CordaFuture<*>> {
return smm.findStateMachines(P::class.java).single()
}
private inline fun <reified P : FlowLogic<*>> MockNode.registerFlowFactory(
initiatingFlowClass: KClass<out FlowLogic<*>>,
noinline flowFactory: (Party) -> P): ListenableFuture<P>
noinline flowFactory: (Party) -> P): CordaFuture<P>
{
val observable = internalRegisterFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> {
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): P {
@ -772,7 +772,7 @@ class FlowFrameworkTests {
private infix fun MockNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(id, message)
private infix fun Pair<Int, SessionMessage>.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
private val FlowLogic<*>.progressSteps: ListenableFuture<List<Notification<ProgressTracker.Step>>> get() {
private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTracker.Step>>> get() {
return progressTracker!!.changes
.ofType(Change.Position::class.java)
.map { it.newStep }

View File

@ -6,8 +6,8 @@ import io.atomix.copycat.client.CopycatClient
import io.atomix.copycat.server.CopycatServer
import io.atomix.copycat.server.storage.Storage
import io.atomix.copycat.server.storage.StorageLevel
import net.corda.core.getOrThrow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase

View File

@ -1,16 +1,16 @@
package net.corda.node.services.transactions
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.DigitalSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.seconds
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
@ -132,7 +132,7 @@ class NotaryServiceTests {
notaryError.conflict.verified()
}
private fun runNotaryClient(stx: SignedTransaction): ListenableFuture<List<DigitalSignature.WithKey>> {
private fun runNotaryClient(stx: SignedTransaction): CordaFuture<List<DigitalSignature.WithKey>> {
val flow = NotaryFlow.Client(stx)
val future = clientNode.services.startFlow(flow).resultFuture
mockNet.runNetwork()

View File

@ -1,6 +1,6 @@
package net.corda.node.services.transactions
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Command
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
@ -8,9 +8,9 @@ import net.corda.core.crypto.DigitalSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.internal.AbstractNode
import net.corda.node.services.issueInvalidState
@ -85,7 +85,7 @@ class ValidatingNotaryServiceTests {
assertEquals(setOf(expectedMissingKey), missingKeys)
}
private fun runClient(stx: SignedTransaction): ListenableFuture<List<DigitalSignature.WithKey>> {
private fun runClient(stx: SignedTransaction): CordaFuture<List<DigitalSignature.WithKey>> {
val flow = NotaryFlow.Client(stx)
val future = clientNode.services.startFlow(flow).resultFuture
mockNet.runNetwork()

View File

@ -5,7 +5,7 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.getOrThrow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.hours
import net.corda.core.utilities.minutes
import net.corda.testing.node.TestClock

View File

@ -1,8 +1,8 @@
package net.corda.attachmentdemo
import com.google.common.util.concurrent.Futures
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_BANK_B
import net.corda.testing.DUMMY_NOTARY
@ -19,11 +19,11 @@ class AttachmentDemoTest {
val numOfExpectedBytes = 10_000_000
driver(dsl = {
val demoUser = listOf(User("demo", "demo", setOf(startFlowPermission<AttachmentDemoFlow>())))
val (nodeA, nodeB) = Futures.allAsList(
val (nodeA, nodeB) = listOf(
startNode(DUMMY_BANK_A.name, rpcUsers = demoUser),
startNode(DUMMY_BANK_B.name, rpcUsers = demoUser),
startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type)))
).getOrThrow()
).transpose().getOrThrow()
val senderThread = CompletableFuture.supplyAsync {
nodeA.rpcClientToNode().start(demoUser[0].username, demoUser[0].password).use {

View File

@ -1,16 +1,15 @@
package net.corda.attachmentdemo
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import joptsimple.OptionParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Contract
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.getOrThrow
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.Emoji
@ -22,6 +21,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_B
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.driver.poll
@ -86,8 +86,8 @@ fun sender(rpc: CordaRPCOps, numOfClearBytes: Int = 1024) { // default size 1K.
private fun sender(rpc: CordaRPCOps, inputStream: InputStream, hash: SecureHash.SHA256, executor: ScheduledExecutorService) {
// Get the identity key of the other side (the recipient).
val notaryFuture: ListenableFuture<Party> = poll(executor, DUMMY_NOTARY.name.toString()) { rpc.partyFromX500Name(DUMMY_NOTARY.name) }
val otherSideFuture: ListenableFuture<Party> = poll(executor, DUMMY_BANK_B.name.toString()) { rpc.partyFromX500Name(DUMMY_BANK_B.name) }
val notaryFuture: CordaFuture<Party> = poll(executor, DUMMY_NOTARY.name.toString()) { rpc.partyFromX500Name(DUMMY_NOTARY.name) }
val otherSideFuture: CordaFuture<Party> = poll(executor, DUMMY_BANK_B.name.toString()) { rpc.partyFromX500Name(DUMMY_BANK_B.name) }
// Make sure we have the file in storage
if (!rpc.attachmentExists(hash)) {

View File

@ -1,13 +1,13 @@
package net.corda.bank
import com.google.common.util.concurrent.Futures
import net.corda.bank.api.BankOfCordaClientApi
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.core.getOrThrow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.testing.driver.driver
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.BOC
import net.corda.testing.driver.driver
import org.junit.Test
import kotlin.test.assertTrue
@ -15,10 +15,10 @@ class BankOfCordaHttpAPITest {
@Test
fun `issuer flow via Http`() {
driver(dsl = {
val (nodeBankOfCorda) = Futures.allAsList(
val (nodeBankOfCorda) = listOf(
startNode(BOC.name, setOf(ServiceInfo(SimpleNotaryService.type))),
startNode(BIGCORP_LEGAL_NAME)
).getOrThrow()
).transpose().getOrThrow()
val anonymous = false
val nodeBankOfCordaApiAddr = startWebserver(nodeBankOfCorda).getOrThrow().listenAddress
assertTrue(BankOfCordaClientApi(nodeBankOfCordaApiAddr).requestWebIssue(IssueRequestParams(1000, "USD", BIGCORP_LEGAL_NAME, "1", BOC.name, BOC.name, anonymous)))

View File

@ -1,14 +1,14 @@
package net.corda.bank
import com.google.common.util.concurrent.Futures
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.DOLLARS
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultTrackBy
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
@ -23,10 +23,10 @@ class BankOfCordaRPCClientTest {
driver(dsl = {
val bocManager = User("bocManager", "password1", permissions = setOf(startFlowPermission<IssuanceRequester>()))
val bigCorpCFO = User("bigCorpCFO", "password2", permissions = emptySet())
val (nodeBankOfCorda, nodeBigCorporation) = Futures.allAsList(
val (nodeBankOfCorda, nodeBigCorporation) = listOf(
startNode(BOC.name, setOf(ServiceInfo(SimpleNotaryService.type)), listOf(bocManager)),
startNode(BIGCORP_LEGAL_NAME, rpcUsers = listOf(bigCorpCFO))
).getOrThrow()
).transpose().getOrThrow()
// Bank of Corda RPC Client
val bocClient = nodeBankOfCorda.rpcClientToNode()

View File

@ -4,11 +4,11 @@ import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.Amount
import net.corda.core.contracts.currency
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.flows.IssuerFlow.IssuanceRequester
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.http.HttpApi

View File

@ -2,10 +2,10 @@ package net.corda.bank.api
import net.corda.core.contracts.Amount
import net.corda.core.contracts.currency
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.flows.IssuerFlow.IssuanceRequester
import org.bouncycastle.asn1.x500.X500Name

View File

@ -1,12 +1,12 @@
package net.corda.irs
import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.getOrThrow
import net.corda.core.messaging.vaultTrackBy
import net.corda.core.node.services.ServiceInfo
import net.corda.core.toFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.irs.api.NodeInterestRates
import net.corda.irs.contract.InterestRateSwap
@ -37,19 +37,19 @@ class IRSDemoTest : IntegrationTestCategory {
@Test
fun `runs IRS demo`() {
driver(useTestClock = true, isDebug = true) {
val (controller, nodeA, nodeB) = Futures.allAsList(
val (controller, nodeA, nodeB) = listOf(
startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.Oracle.type))),
startNode(DUMMY_BANK_A.name, rpcUsers = listOf(rpcUser)),
startNode(DUMMY_BANK_B.name)
).getOrThrow()
).transpose().getOrThrow()
println("All nodes started")
val (controllerAddr, nodeAAddr, nodeBAddr) = Futures.allAsList(
val (controllerAddr, nodeAAddr, nodeBAddr) = listOf(
startWebserver(controller),
startWebserver(nodeA),
startWebserver(nodeB)
).getOrThrow().map { it.listenAddress }
).transpose().getOrThrow().map { it.listenAddress }
println("All webservers started")

View File

@ -1,12 +1,10 @@
package net.corda.irs.api
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.filterStatesOfType
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.messaging.vaultQueryBy
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.loggerFor
import net.corda.irs.contract.InterestRateSwap
import net.corda.irs.flows.AutoOfferFlow

View File

@ -1,8 +1,8 @@
package net.corda.irs
import com.google.common.util.concurrent.Futures
import net.corda.core.getOrThrow
import net.corda.core.internal.concurrent.transpose
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_BANK_B
import net.corda.testing.DUMMY_NOTARY
@ -16,11 +16,11 @@ import net.corda.testing.driver.driver
*/
fun main(args: Array<String>) {
driver(dsl = {
val (controller, nodeA, nodeB) = Futures.allAsList(
val (controller, nodeA, nodeB) = listOf(
startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type), ServiceInfo(NodeInterestRates.Oracle.type))),
startNode(DUMMY_BANK_A.name),
startNode(DUMMY_BANK_B.name)
).getOrThrow()
).transpose().getOrThrow()
startWebserver(controller)
startWebserver(nodeA)

View File

@ -1,8 +1,8 @@
package net.corda.irs.api
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.irs.flows.UpdateBusinessDayFlow
import java.time.LocalDate

View File

@ -9,11 +9,11 @@ import net.corda.contracts.asset.`owned by`
import net.corda.core.contracts.*
import net.corda.core.crypto.MerkleTreeException
import net.corda.core.crypto.generateKeyPair
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.irs.flows.RatesFixFlow
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase

View File

@ -13,7 +13,6 @@ import javafx.stage.Stage
import javafx.util.Duration
import net.corda.core.crypto.commonName
import net.corda.core.serialization.deserialize
import net.corda.core.then
import net.corda.core.utilities.ProgressTracker
import net.corda.netmap.VisualiserViewModel.Style
import net.corda.netmap.simulation.IRSSimulation

View File

@ -3,17 +3,15 @@ package net.corda.netmap.simulation
import co.paralleluniverse.fibers.Suspendable
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.google.common.util.concurrent.*
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateAndRef
import net.corda.core.flatMap
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.*
import net.corda.core.internal.FlowStateMachine
import net.corda.core.map
import net.corda.core.node.services.queryBy
import net.corda.core.thenMatch
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.TwoPartyDealFlow.Acceptor
@ -43,8 +41,8 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
private val executeOnNextIteration = Collections.synchronizedList(LinkedList<() -> Unit>())
override fun startMainSimulation(): ListenableFuture<Unit> {
val future = SettableFuture.create<Unit>()
override fun startMainSimulation(): CordaFuture<Unit> {
val future = openFuture<Unit>()
om = JacksonSupport.createInMemoryMapper(InMemoryIdentityService((banks + regulators + networkMap).map { it.info.legalIdentityAndCert }, trustRoot = DUMMY_CA.certificate))
startIRSDealBetween(0, 1).thenMatch({
@ -53,32 +51,30 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
executeOnNextIteration.add {
// Keep fixing until there's no more left to do.
val initialFixFuture = doNextFixing(0, 1)
fun onFailure(t: Throwable) {
future.setException(t) // Propagate the error.
}
Futures.addCallback(initialFixFuture, object : FutureCallback<Unit> {
override fun onFailure(t: Throwable) {
future.setException(t) // Propagate the error.
}
override fun onSuccess(result: Unit?) {
// Pause for an iteration.
executeOnNextIteration.add {}
executeOnNextIteration.add {
val f = doNextFixing(0, 1)
if (f != null) {
Futures.addCallback(f, this, MoreExecutors.directExecutor())
} else {
// All done!
future.set(Unit)
}
fun onSuccess(result: Unit?) {
// Pause for an iteration.
executeOnNextIteration.add {}
executeOnNextIteration.add {
val f = doNextFixing(0, 1)
if (f != null) {
f.thenMatch(::onSuccess, ::onFailure)
} else {
// All done!
future.set(Unit)
}
}
}, MoreExecutors.directExecutor())
}
initialFixFuture!!.thenMatch(::onSuccess, ::onFailure)
}
}, {})
return future
}
private fun doNextFixing(i: Int, j: Int): ListenableFuture<Unit>? {
private fun doNextFixing(i: Int, j: Int): CordaFuture<Unit>? {
println("Doing a fixing between $i and $j")
val node1: SimulatedNode = banks[i]
val node2: SimulatedNode = banks[j]
@ -104,10 +100,10 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
if (nextFixingDate > currentDateAndTime.toLocalDate())
currentDateAndTime = nextFixingDate.atTime(15, 0)
return Futures.allAsList(futA, futB).map { Unit }
return listOf(futA, futB).transpose().map { Unit }
}
private fun startIRSDealBetween(i: Int, j: Int): ListenableFuture<SignedTransaction> {
private fun startIRSDealBetween(i: Int, j: Int): CordaFuture<SignedTransaction> {
val node1: SimulatedNode = banks[i]
val node2: SimulatedNode = banks[j]
@ -151,7 +147,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
node1.services.legalIdentityKey)
val instigatorTxFuture = node1.services.startFlow(instigator).resultFuture
return Futures.allAsList(instigatorTxFuture, acceptorTxFuture).flatMap { instigatorTxFuture }
return listOf(instigatorTxFuture, acceptorTxFuture).transpose().flatMap { instigatorTxFuture }
}
override fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {

View File

@ -1,15 +1,16 @@
package net.corda.netmap.simulation
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.locationOrNull
import net.corda.core.flatMap
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.CityDatabase
import net.corda.core.node.WorldMapLocation
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.containsType
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.transpose
import net.corda.testing.DUMMY_MAP
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_REGULATOR
@ -260,10 +261,9 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
}
}
val networkInitialisationFinished: ListenableFuture<*> =
Futures.allAsList(mockNet.nodes.map { it.networkMapRegistrationFuture })
val networkInitialisationFinished = mockNet.nodes.map { it.networkMapRegistrationFuture }.transpose()
fun start(): ListenableFuture<Unit> {
fun start(): CordaFuture<Unit> {
mockNet.startNodes()
// Wait for all the nodes to have finished registering with the network map service.
return networkInitialisationFinished.flatMap { startMainSimulation() }
@ -273,8 +273,8 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
* Sub-classes should override this to trigger whatever they want to simulate. This method will be invoked once the
* network bringup has been simulated.
*/
protected open fun startMainSimulation(): ListenableFuture<Unit> {
return Futures.immediateFuture(Unit)
protected open fun startMainSimulation(): CordaFuture<Unit> {
return doneFuture(Unit)
}
fun stop() {

View File

@ -1,6 +1,6 @@
package net.corda.netmap.simulation
import net.corda.core.getOrThrow
import net.corda.core.utilities.getOrThrow
import net.corda.testing.LogHelper
import org.junit.Test

View File

@ -1,16 +1,16 @@
package net.corda.notarydemo
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.notUsed
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.toStringShort
import net.corda.core.getOrThrow
import net.corda.core.map
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.notarydemo.flows.DummyIssueAndMove
import net.corda.notarydemo.flows.RPCStartableNotaryFlowClient
import net.corda.testing.BOB
@ -55,9 +55,9 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
* as it consumes the original asset and creates a copy with the new owner as its output.
*/
private fun buildTransactions(count: Int): List<SignedTransaction> {
return Futures.allAsList((1..count).map {
return (1..count).map {
rpc.startFlow(::DummyIssueAndMove, notary, counterpartyNode.legalIdentity, it).returnValue
}).getOrThrow()
}.transpose().getOrThrow()
}
/**
@ -66,7 +66,7 @@ private class NotaryDemoClientApi(val rpc: CordaRPCOps) {
*
* @return a list of encoded signer public keys - one for every transaction
*/
private fun notariseTransactions(transactions: List<SignedTransaction>): List<ListenableFuture<List<String>>> {
private fun notariseTransactions(transactions: List<SignedTransaction>): List<CordaFuture<List<String>>> {
return transactions.map {
rpc.startFlow(::RPCStartableNotaryFlowClient, it).returnValue.map { it.map { it.by.toStringShort() } }
}

Some files were not shown because too many files have changed in this diff Show More