Better use of Futures

This commit is contained in:
Shams Asari 2016-10-04 11:21:40 +01:00
parent 94e830d19a
commit 5e6533eb8a
11 changed files with 87 additions and 96 deletions

View File

@ -1,26 +1,28 @@
package com.r3corda.client
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.map
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.messaging.onNext
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.success
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.monitor.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.IN_EVENT_TOPIC
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.OUT_EVENT_TOPIC
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.REGISTER_TOPIC
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.STATE_TOPIC
import rx.Observable
import rx.Observer
/**
* Worked example of a client which communicates with the wallet monitor service.
*/
private val log: Logger = LoggerFactory.getLogger(NodeMonitorClient::class.java)
class NodeMonitorClient(
val net: MessagingService,
val node: NodeInfo,
@ -28,35 +30,31 @@ class NodeMonitorClient(
val inEvents: Observer<ServiceToClientEvent>,
val snapshot: Observer<StateSnapshotMessage>
) {
private val sessionID = random63BitValue()
companion object {
private val log = loggerFor<NodeMonitorClient>()
}
fun register(): ListenableFuture<Boolean> {
val sessionID = random63BitValue()
val future = SettableFuture.create<Boolean>()
log.info("Registering with ID $sessionID. I am ${net.myAddress}")
net.addMessageHandler(NodeMonitorService.REGISTER_TOPIC, sessionID) { msg, reg ->
val resp = msg.data.deserialize<RegisterResponse>()
net.removeMessageHandler(reg)
future.set(resp.success)
}
net.addMessageHandler(NodeMonitorService.STATE_TOPIC, sessionID) { msg, reg ->
val snapshotMessage = msg.data.deserialize<StateSnapshotMessage>()
net.removeMessageHandler(reg)
snapshot.onNext(snapshotMessage)
}
val future = net.onNext<RegisterResponse>(REGISTER_TOPIC, sessionID).map { it.success }
net.addMessageHandler(NodeMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg ->
net.onNext<StateSnapshotMessage>(STATE_TOPIC, sessionID).success { snapshot.onNext(it) }
net.addMessageHandler(IN_EVENT_TOPIC, sessionID) { msg, reg ->
val event = msg.data.deserialize<ServiceToClientEvent>()
inEvents.onNext(event)
}
val req = RegisterRequest(net.myAddress, sessionID)
val registerMessage = net.createMessage(NodeMonitorService.REGISTER_TOPIC, 0, req.serialize().bits)
val registerMessage = net.createMessage(REGISTER_TOPIC, 0, req.serialize().bits)
net.send(registerMessage, node.address)
outEvents.subscribe { event ->
val envelope = ClientToServiceCommandMessage(sessionID, net.myAddress, event)
val message = net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, 0, envelope.serialize().bits)
val message = net.createMessage(OUT_EVENT_TOPIC, 0, envelope.serialize().bits)
net.send(message, node.address)
}

View File

@ -28,22 +28,22 @@ import kotlin.reflect.KProperty
val Int.days: Duration get() = Duration.ofDays(this.toLong())
@Suppress("unused") // It's here for completeness
val Int.hours: Duration get() = Duration.ofHours(this.toLong())
@Suppress("unused") // It's here for completeness
val Int.minutes: Duration get() = Duration.ofMinutes(this.toLong())
val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong())
// TODO: Review by EOY2016 if we ever found these utilities helpful.
@Suppress("unused") val Int.bd: BigDecimal get() = BigDecimal(this)
@Suppress("unused") val Double.bd: BigDecimal get() = BigDecimal(this)
@Suppress("unused") val String.bd: BigDecimal get() = BigDecimal(this)
@Suppress("unused") val Long.bd: BigDecimal get() = BigDecimal(this)
val Int.bd: BigDecimal get() = BigDecimal(this)
val Double.bd: BigDecimal get() = BigDecimal(this)
val String.bd: BigDecimal get() = BigDecimal(this)
val Long.bd: BigDecimal get() = BigDecimal(this)
fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + ""
/** Like the + operator but throws an exception in case of integer overflow. */
infix fun Int.checkedAdd(b: Int) = Math.addExact(this, b)
/** Like the + operator but throws an exception in case of integer overflow. */
@Suppress("unused")
infix fun Long.checkedAdd(b: Long) = Math.addExact(this, b)
/**
@ -80,15 +80,12 @@ infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): Listenable
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, Function { mapper(it!!) })
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
/** Executes the given block and sets the future to either the result, or any exception that was thrown. */
// TODO This is not used but there's existing code that can be replaced by this
fun <T> SettableFuture<T>.setFrom(logger: Logger? = null, block: () -> T): SettableFuture<T> {
inline fun <T> SettableFuture<T>.catch(block: () -> T) {
try {
set(block())
} catch (e: Exception) {
logger?.error("Caught exception", e)
setException(e)
} catch (t: Throwable) {
setException(t)
}
return this
}
fun <R> Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block)

View File

@ -456,10 +456,5 @@ data class Commodity(val commodityCode: String,
* Subsequent copies and evolutions of a state should just copy the externalId and Id fields unmodified.
*/
data class UniqueIdentifier(val externalId: String? = null, val id: UUID = UUID.randomUUID()) {
override fun toString(): String {
if (externalId != null) {
return "${externalId}_${id.toString()}"
}
return id.toString()
}
override fun toString(): String = if (externalId != null) "${externalId}_$id" else id.toString()
}

View File

@ -1,7 +1,11 @@
package com.r3corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.catch
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import java.time.Instant
import java.util.*
@ -117,7 +121,7 @@ fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor:
*
* @param topicSession identifier for the topic and session to listen for messages arriving on.
*/
fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Executor? = null, callback: (Message) -> Unit) {
inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Executor? = null, crossinline callback: (Message) -> Unit) {
val consumed = AtomicBoolean()
addMessageHandler(topicSession, executor) { msg, reg ->
removeMessageHandler(reg)
@ -127,6 +131,20 @@ fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Exec
}
}
/**
* Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
* The payload is deserilaized to an object of type [M]. Any exceptions thrown will be captured by the future.
*/
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long, executor: Executor? = null): ListenableFuture<M> {
val messageFuture = SettableFuture.create<M>()
runOnNextMessage(topic, sessionId, executor) { message ->
messageFuture.catch {
message.data.deserialize<M>()
}
}
return messageFuture
}
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
= send(TopicSession(topic, sessionID), payload, to, uuid)

View File

@ -9,7 +9,7 @@ import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.onNext
import com.r3corda.core.node.CityDatabase
import com.r3corda.core.node.CordaPluginRegistry
import com.r3corda.core.node.NodeInfo
@ -37,6 +37,7 @@ import com.r3corda.node.services.monitor.NodeMonitorService
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NetworkMapService.RegistrationResponse
import com.r3corda.node.services.network.NodeRegistration
import com.r3corda.node.services.network.PersistentNetworkMapService
import com.r3corda.node.services.persistence.*
@ -357,7 +358,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
"has any other map node been configured.")
}
private fun updateRegistration(networkMapAddr: SingleMessageRecipient, type: AddOrRemove): ListenableFuture<NetworkMapService.RegistrationResponse> {
private fun updateRegistration(networkMapAddr: SingleMessageRecipient, type: AddOrRemove): ListenableFuture<RegistrationResponse> {
// Register this node against the network
val instant = platformClock.instant()
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
@ -365,13 +366,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
val sessionID = random63BitValue()
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
val message = net.createMessage(REGISTER_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, request.serialize().bits)
val future = SettableFuture.create<NetworkMapService.RegistrationResponse>()
net.runOnNextMessage(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread) { message ->
future.set(message.data.deserialize())
}
val future = net.onNext<RegistrationResponse>(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread)
net.send(message, networkMapAddr)
return future
}

View File

@ -2,10 +2,11 @@ package com.r3corda.node.services.network
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.contracts.Contract
import com.r3corda.core.crypto.Party
import com.r3corda.core.map
import com.r3corda.core.messaging.*
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
@ -19,6 +20,10 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.node.services.api.RegulatorService
import com.r3corda.node.services.network.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
import com.r3corda.node.services.network.NetworkMapService.FetchMapResponse
import com.r3corda.node.services.network.NetworkMapService.SubscribeResponse
import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.node.utilities.AddOrRemove
import rx.Observable
@ -82,15 +87,13 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
_registrationFuture.setFuture(future)
net.runOnNextMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.FetchMapResponse>()
val future = net.onNext<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map { resp ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
future.set(Unit)
Unit
}
net.send(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, networkMapAddress)
net.send(FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, networkMapAddress)
_registrationFuture.setFuture(future)
return future
}
@ -121,29 +124,24 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
// Add a message handler for the response, and prepare a future to put the data into.
// Note that the message handler will run on the network thread (not this one).
val future = SettableFuture.create<Unit>()
_registrationFuture.setFuture(future)
net.runOnNextMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message ->
val resp = message.data.deserialize<NetworkMapService.SubscribeResponse>()
if (resp.confirmed) {
future.set(Unit)
} else {
future.setException(NetworkCacheError.DeregistrationFailed())
}
val future = net.onNext<SubscribeResponse>(SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map {
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
}
net.send(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address)
_registrationFuture.setFuture(future)
net.send(SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address)
return future
}
fun processUpdatePush(req: NetworkMapService.Update) {
val reg: NodeRegistration
try {
reg = req.wireReg.verified()
} catch(e: SignatureException) {
val reg = req.wireReg.verified()
processRegistration(reg)
} catch (e: SignatureException) {
throw NodeMapError.InvalidSignature()
}
processRegistration(reg)
}
private fun processRegistration(reg: NodeRegistration) {

View File

@ -1,25 +1,27 @@
package com.r3corda.node.services.persistence
import co.paralleluniverse.strands.SettableFuture
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.google.common.primitives.Ints
import com.r3corda.core.transactions.SignedTransaction
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.NullPublicKey
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.transactions.SignedTransaction
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.nio.file.FileSystem
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class PerFileTransactionStorageTests {
val fileSystem = Jimfs.newFileSystem(unix())
val storeDir = fileSystem.getPath("store")
val fileSystem: FileSystem = Jimfs.newFileSystem(unix())
val storeDir: Path = fileSystem.getPath("store")
lateinit var transactionStorage: PerFileTransactionStorage
@Before
@ -74,7 +76,7 @@ class PerFileTransactionStorageTests {
@Test
fun `updates are fired`() {
val future = SettableFuture<SignedTransaction>()
val future = SettableFuture.create<SignedTransaction>()
transactionStorage.updates.subscribe { tx -> future.set(tx) }
val expected = newTransaction()
transactionStorage.addTransaction(expected)

View File

@ -9,8 +9,8 @@ import com.r3corda.contracts.InterestRateSwap
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.contracts.UniqueIdentifier
import com.r3corda.core.failure
import com.r3corda.core.flatMap
import com.r3corda.core.map
import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.success
import com.r3corda.core.transactions.SignedTransaction
@ -83,15 +83,9 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
extraNodeLabels[node1] = "Fixing event on $nextFixingDate"
extraNodeLabels[node2] = "Fixing event on $nextFixingDate"
val retFuture = SettableFuture.create<Unit>()
// Complete the future when the state has been consumed on both nodes
val futA = node1.services.vaultService.whenConsumed(theDealRef.ref)
val futB = node2.services.vaultService.whenConsumed(theDealRef.ref)
Futures.allAsList(futA, futB) success {
retFuture.set(null)
} failure { throwable ->
retFuture.setException(throwable)
}
showProgressFor(listOf(node1, node2))
showConsensusFor(listOf(node1, node2, regulators[0]))
@ -100,7 +94,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
if (nextFixingDate > currentDateAndTime.toLocalDate())
currentDateAndTime = nextFixingDate.atTime(15, 0)
return retFuture
return Futures.allAsList(futA, futB).map { Unit }
}
private fun startIRSDealBetween(i: Int, j: Int): ListenableFuture<SignedTransaction> {
@ -125,7 +119,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val instigator = Instigator(node2.info.identity, AutoOffer(notary.info.identity, irs), node1.keyPair!!)
val instigatorTx = node1.services.startProtocol("instigator", instigator)
return Futures.transformAsync(Futures.allAsList(instigatorTx, acceptorTx)) { instigatorTx }
return Futures.allAsList(instigatorTx, acceptorTx).flatMap { instigatorTx }
}
override fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {

View File

@ -4,6 +4,7 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.flatMap
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.CityDatabase
import com.r3corda.core.node.PhysicalLocation
@ -300,15 +301,13 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
}
}
@Suppress("unused") // Used from the network visualiser tool.
val networkInitialisationFinished: ListenableFuture<*> =
Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture })
fun start(): ListenableFuture<Unit> {
network.startNodes()
// Wait for all the nodes to have finished registering with the network map service.
val startup: ListenableFuture<List<Unit>> = Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture })
return Futures.transformAsync(startup) { l: List<Unit>? -> startMainSimulation() }
return networkInitialisationFinished.flatMap { startMainSimulation() }
}
/**

View File

@ -1,6 +1,6 @@
package com.r3corda.testing
import co.paralleluniverse.strands.SettableFuture
import com.google.common.util.concurrent.SettableFuture
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Observable
@ -131,7 +131,7 @@ fun <S, E : Any> S.genericExpectEvents(
stream: S.((E) -> Unit) -> Unit,
expectCompose: () -> ExpectCompose<E>
) {
val finishFuture = SettableFuture<Unit>()
val finishFuture = SettableFuture.create<Unit>()
/**
* Internally we create a "lazy" state automaton. The outgoing edges are state.getExpectedEvents() modulo additional
* matching logic. When an event comes we extract the first edge that matches using state.nextState(event), which

View File

@ -4,19 +4,17 @@ import com.google.common.jimfs.Jimfs
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.crypto.Party
import com.r3corda.core.div
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.messaging.TopicSession
import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.onNext
import com.r3corda.core.messaging.send
import com.r3corda.core.node.PhysicalLocation
import com.r3corda.core.node.services.KeyManagementService
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.core.node.services.VaultService
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.testing.InMemoryVaultService
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
import com.r3corda.core.utilities.loggerFor
@ -153,12 +151,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
services.networkService.send(TopicSession(topic), payload, target.info.address)
}
inline fun <reified T : Any> receive(topic: String, sessionId: Long): ListenableFuture<T> {
val receive = SettableFuture.create<T>()
services.networkService.runOnNextMessage(topic, sessionId) {
receive.set(it.data.deserialize<T>())
}
return receive
fun <M : Any> receive(topic: String, sessionId: Long): ListenableFuture<M> {
return services.networkService.onNext<M>(topic, sessionId)
}
inline fun <reified T : Any> sendAndReceive(topic: String,