From 5e6533eb8a40337a555bbd55fed3cc99680b20a3 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 4 Oct 2016 11:21:40 +0100 Subject: [PATCH] Better use of Futures --- .../com/r3corda/client/NodeMonitorClient.kt | 40 +++++++++---------- .../src/main/kotlin/com/r3corda/core/Utils.kt | 19 ++++----- .../r3corda/core/contracts/FinanceTypes.kt | 7 +--- .../com/r3corda/core/messaging/Messaging.kt | 20 +++++++++- .../com/r3corda/node/internal/AbstractNode.kt | 12 ++---- .../network/InMemoryNetworkMapCache.kt | 40 +++++++++---------- .../PerFileTransactionStorageTests.kt | 12 +++--- .../com/r3corda/simulation/IRSSimulation.kt | 12 ++---- .../com/r3corda/simulation/Simulation.kt | 5 +-- .../main/kotlin/com/r3corda/testing/Expect.kt | 4 +- .../com/r3corda/testing/node/MockNode.kt | 12 ++---- 11 files changed, 87 insertions(+), 96 deletions(-) diff --git a/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt b/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt index 83b25b18c8..64f7fbdbfe 100644 --- a/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt +++ b/client/src/main/kotlin/com/r3corda/client/NodeMonitorClient.kt @@ -1,26 +1,28 @@ package com.r3corda.client import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.contracts.ClientToServiceCommand +import com.r3corda.core.map import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.createMessage +import com.r3corda.core.messaging.onNext import com.r3corda.core.node.NodeInfo import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize +import com.r3corda.core.success +import com.r3corda.core.utilities.loggerFor import com.r3corda.node.services.monitor.* -import org.slf4j.Logger -import org.slf4j.LoggerFactory +import com.r3corda.node.services.monitor.NodeMonitorService.Companion.IN_EVENT_TOPIC +import com.r3corda.node.services.monitor.NodeMonitorService.Companion.OUT_EVENT_TOPIC +import com.r3corda.node.services.monitor.NodeMonitorService.Companion.REGISTER_TOPIC +import com.r3corda.node.services.monitor.NodeMonitorService.Companion.STATE_TOPIC import rx.Observable import rx.Observer /** * Worked example of a client which communicates with the wallet monitor service. */ - -private val log: Logger = LoggerFactory.getLogger(NodeMonitorClient::class.java) - class NodeMonitorClient( val net: MessagingService, val node: NodeInfo, @@ -28,35 +30,31 @@ class NodeMonitorClient( val inEvents: Observer, val snapshot: Observer ) { - private val sessionID = random63BitValue() + + companion object { + private val log = loggerFor() + } fun register(): ListenableFuture { + val sessionID = random63BitValue() - val future = SettableFuture.create() log.info("Registering with ID $sessionID. I am ${net.myAddress}") - net.addMessageHandler(NodeMonitorService.REGISTER_TOPIC, sessionID) { msg, reg -> - val resp = msg.data.deserialize() - net.removeMessageHandler(reg) - future.set(resp.success) - } - net.addMessageHandler(NodeMonitorService.STATE_TOPIC, sessionID) { msg, reg -> - val snapshotMessage = msg.data.deserialize() - net.removeMessageHandler(reg) - snapshot.onNext(snapshotMessage) - } + val future = net.onNext(REGISTER_TOPIC, sessionID).map { it.success } - net.addMessageHandler(NodeMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg -> + net.onNext(STATE_TOPIC, sessionID).success { snapshot.onNext(it) } + + net.addMessageHandler(IN_EVENT_TOPIC, sessionID) { msg, reg -> val event = msg.data.deserialize() inEvents.onNext(event) } val req = RegisterRequest(net.myAddress, sessionID) - val registerMessage = net.createMessage(NodeMonitorService.REGISTER_TOPIC, 0, req.serialize().bits) + val registerMessage = net.createMessage(REGISTER_TOPIC, 0, req.serialize().bits) net.send(registerMessage, node.address) outEvents.subscribe { event -> val envelope = ClientToServiceCommandMessage(sessionID, net.myAddress, event) - val message = net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, 0, envelope.serialize().bits) + val message = net.createMessage(OUT_EVENT_TOPIC, 0, envelope.serialize().bits) net.send(message, node.address) } diff --git a/core/src/main/kotlin/com/r3corda/core/Utils.kt b/core/src/main/kotlin/com/r3corda/core/Utils.kt index 0fa7447194..fc2d40768b 100644 --- a/core/src/main/kotlin/com/r3corda/core/Utils.kt +++ b/core/src/main/kotlin/com/r3corda/core/Utils.kt @@ -28,22 +28,22 @@ import kotlin.reflect.KProperty val Int.days: Duration get() = Duration.ofDays(this.toLong()) @Suppress("unused") // It's here for completeness val Int.hours: Duration get() = Duration.ofHours(this.toLong()) -@Suppress("unused") // It's here for completeness val Int.minutes: Duration get() = Duration.ofMinutes(this.toLong()) val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong()) // TODO: Review by EOY2016 if we ever found these utilities helpful. -@Suppress("unused") val Int.bd: BigDecimal get() = BigDecimal(this) -@Suppress("unused") val Double.bd: BigDecimal get() = BigDecimal(this) -@Suppress("unused") val String.bd: BigDecimal get() = BigDecimal(this) -@Suppress("unused") val Long.bd: BigDecimal get() = BigDecimal(this) +val Int.bd: BigDecimal get() = BigDecimal(this) +val Double.bd: BigDecimal get() = BigDecimal(this) +val String.bd: BigDecimal get() = BigDecimal(this) +val Long.bd: BigDecimal get() = BigDecimal(this) fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + "…" /** Like the + operator but throws an exception in case of integer overflow. */ infix fun Int.checkedAdd(b: Int) = Math.addExact(this, b) /** Like the + operator but throws an exception in case of integer overflow. */ +@Suppress("unused") infix fun Long.checkedAdd(b: Long) = Math.addExact(this, b) /** @@ -80,15 +80,12 @@ infix fun ListenableFuture.failure(body: (Throwable) -> Unit): Listenable infix fun ListenableFuture.map(mapper: (F) -> T): ListenableFuture = Futures.transform(this, Function { mapper(it!!) }) infix fun ListenableFuture.flatMap(mapper: (F) -> ListenableFuture): ListenableFuture = Futures.transformAsync(this) { mapper(it!!) } /** Executes the given block and sets the future to either the result, or any exception that was thrown. */ -// TODO This is not used but there's existing code that can be replaced by this -fun SettableFuture.setFrom(logger: Logger? = null, block: () -> T): SettableFuture { +inline fun SettableFuture.catch(block: () -> T) { try { set(block()) - } catch (e: Exception) { - logger?.error("Caught exception", e) - setException(e) + } catch (t: Throwable) { + setException(t) } - return this } fun Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block) diff --git a/core/src/main/kotlin/com/r3corda/core/contracts/FinanceTypes.kt b/core/src/main/kotlin/com/r3corda/core/contracts/FinanceTypes.kt index c147080889..1d91ddfee1 100644 --- a/core/src/main/kotlin/com/r3corda/core/contracts/FinanceTypes.kt +++ b/core/src/main/kotlin/com/r3corda/core/contracts/FinanceTypes.kt @@ -456,10 +456,5 @@ data class Commodity(val commodityCode: String, * Subsequent copies and evolutions of a state should just copy the externalId and Id fields unmodified. */ data class UniqueIdentifier(val externalId: String? = null, val id: UUID = UUID.randomUUID()) { - override fun toString(): String { - if (externalId != null) { - return "${externalId}_${id.toString()}" - } - return id.toString() - } + override fun toString(): String = if (externalId != null) "${externalId}_$id" else id.toString() } \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt index 83867a4047..359a95d083 100644 --- a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt @@ -1,7 +1,11 @@ package com.r3corda.core.messaging +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import com.r3corda.core.catch import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef +import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import java.time.Instant import java.util.* @@ -117,7 +121,7 @@ fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor: * * @param topicSession identifier for the topic and session to listen for messages arriving on. */ -fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Executor? = null, callback: (Message) -> Unit) { +inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Executor? = null, crossinline callback: (Message) -> Unit) { val consumed = AtomicBoolean() addMessageHandler(topicSession, executor) { msg, reg -> removeMessageHandler(reg) @@ -127,6 +131,20 @@ fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Exec } } +/** + * Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId. + * The payload is deserilaized to an object of type [M]. Any exceptions thrown will be captured by the future. + */ +fun MessagingService.onNext(topic: String, sessionId: Long, executor: Executor? = null): ListenableFuture { + val messageFuture = SettableFuture.create() + runOnNextMessage(topic, sessionId, executor) { message -> + messageFuture.catch { + message.data.deserialize() + } + } + return messageFuture +} + fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) = send(TopicSession(topic, sessionID), payload, to, uuid) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 1eb2ab54c1..337ea56d33 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -9,7 +9,7 @@ import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.X509Utilities import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.createMessage -import com.r3corda.core.messaging.runOnNextMessage +import com.r3corda.core.messaging.onNext import com.r3corda.core.node.CityDatabase import com.r3corda.core.node.CordaPluginRegistry import com.r3corda.core.node.NodeInfo @@ -37,6 +37,7 @@ import com.r3corda.node.services.monitor.NodeMonitorService import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC +import com.r3corda.node.services.network.NetworkMapService.RegistrationResponse import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.network.PersistentNetworkMapService import com.r3corda.node.services.persistence.* @@ -357,7 +358,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap "has any other map node been configured.") } - private fun updateRegistration(networkMapAddr: SingleMessageRecipient, type: AddOrRemove): ListenableFuture { + private fun updateRegistration(networkMapAddr: SingleMessageRecipient, type: AddOrRemove): ListenableFuture { // Register this node against the network val instant = platformClock.instant() val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD @@ -365,13 +366,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap val sessionID = random63BitValue() val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID) val message = net.createMessage(REGISTER_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, request.serialize().bits) - val future = SettableFuture.create() - - net.runOnNextMessage(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread) { message -> - future.set(message.data.deserialize()) - } + val future = net.onNext(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread) net.send(message, networkMapAddr) - return future } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index 1084a98076..717c50cf81 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -2,10 +2,11 @@ package com.r3corda.node.services.network import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture +import com.r3corda.core.RunOnCallerThread import com.r3corda.core.contracts.Contract import com.r3corda.core.crypto.Party +import com.r3corda.core.map import com.r3corda.core.messaging.* import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.DEFAULT_SESSION_ID @@ -19,6 +20,10 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.node.services.api.RegulatorService +import com.r3corda.node.services.network.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC +import com.r3corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC +import com.r3corda.node.services.network.NetworkMapService.FetchMapResponse +import com.r3corda.node.services.network.NetworkMapService.SubscribeResponse import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.utilities.AddOrRemove import rx.Observable @@ -82,15 +87,13 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach // Add a message handler for the response, and prepare a future to put the data into. // Note that the message handler will run on the network thread (not this one). - val future = SettableFuture.create() - _registrationFuture.setFuture(future) - net.runOnNextMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message -> - val resp = message.data.deserialize() + val future = net.onNext(FETCH_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map { resp -> // We may not receive any nodes back, if the map hasn't changed since the version specified resp.nodes?.forEach { processRegistration(it) } - future.set(Unit) + Unit } - net.send(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, networkMapAddress) + net.send(FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, networkMapAddress) + _registrationFuture.setFuture(future) return future } @@ -121,29 +124,24 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach // Add a message handler for the response, and prepare a future to put the data into. // Note that the message handler will run on the network thread (not this one). - val future = SettableFuture.create() - _registrationFuture.setFuture(future) - net.runOnNextMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, MoreExecutors.directExecutor()) { message -> - val resp = message.data.deserialize() - if (resp.confirmed) { - future.set(Unit) - } else { - future.setException(NetworkCacheError.DeregistrationFailed()) - } + + val future = net.onNext(SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map { + if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed() } - net.send(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address) + _registrationFuture.setFuture(future) + + net.send(SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address) return future } fun processUpdatePush(req: NetworkMapService.Update) { - val reg: NodeRegistration try { - reg = req.wireReg.verified() - } catch(e: SignatureException) { + val reg = req.wireReg.verified() + processRegistration(reg) + } catch (e: SignatureException) { throw NodeMapError.InvalidSignature() } - processRegistration(reg) } private fun processRegistration(reg: NodeRegistration) { diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt index 7dc38dbf0b..aa928bd335 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt @@ -1,25 +1,27 @@ package com.r3corda.node.services.persistence -import co.paralleluniverse.strands.SettableFuture import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs import com.google.common.primitives.Ints -import com.r3corda.core.transactions.SignedTransaction +import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.NullPublicKey import com.r3corda.core.serialization.SerializedBytes +import com.r3corda.core.transactions.SignedTransaction import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before import org.junit.Test +import java.nio.file.FileSystem import java.nio.file.Files +import java.nio.file.Path import java.util.concurrent.TimeUnit import kotlin.test.assertEquals class PerFileTransactionStorageTests { - val fileSystem = Jimfs.newFileSystem(unix()) - val storeDir = fileSystem.getPath("store") + val fileSystem: FileSystem = Jimfs.newFileSystem(unix()) + val storeDir: Path = fileSystem.getPath("store") lateinit var transactionStorage: PerFileTransactionStorage @Before @@ -74,7 +76,7 @@ class PerFileTransactionStorageTests { @Test fun `updates are fired`() { - val future = SettableFuture() + val future = SettableFuture.create() transactionStorage.updates.subscribe { tx -> future.set(tx) } val expected = newTransaction() transactionStorage.addTransaction(expected) diff --git a/src/main/kotlin/com/r3corda/simulation/IRSSimulation.kt b/src/main/kotlin/com/r3corda/simulation/IRSSimulation.kt index 1e30cb4e10..64cd030afb 100644 --- a/src/main/kotlin/com/r3corda/simulation/IRSSimulation.kt +++ b/src/main/kotlin/com/r3corda/simulation/IRSSimulation.kt @@ -9,8 +9,8 @@ import com.r3corda.contracts.InterestRateSwap import com.r3corda.core.RunOnCallerThread import com.r3corda.core.contracts.StateAndRef import com.r3corda.core.contracts.UniqueIdentifier -import com.r3corda.core.failure import com.r3corda.core.flatMap +import com.r3corda.core.map import com.r3corda.core.node.services.linearHeadsOfType import com.r3corda.core.success import com.r3corda.core.transactions.SignedTransaction @@ -83,15 +83,9 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten extraNodeLabels[node1] = "Fixing event on $nextFixingDate" extraNodeLabels[node2] = "Fixing event on $nextFixingDate" - val retFuture = SettableFuture.create() // Complete the future when the state has been consumed on both nodes val futA = node1.services.vaultService.whenConsumed(theDealRef.ref) val futB = node2.services.vaultService.whenConsumed(theDealRef.ref) - Futures.allAsList(futA, futB) success { - retFuture.set(null) - } failure { throwable -> - retFuture.setException(throwable) - } showProgressFor(listOf(node1, node2)) showConsensusFor(listOf(node1, node2, regulators[0])) @@ -100,7 +94,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten if (nextFixingDate > currentDateAndTime.toLocalDate()) currentDateAndTime = nextFixingDate.atTime(15, 0) - return retFuture + return Futures.allAsList(futA, futB).map { Unit } } private fun startIRSDealBetween(i: Int, j: Int): ListenableFuture { @@ -125,7 +119,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten val instigator = Instigator(node2.info.identity, AutoOffer(notary.info.identity, irs), node1.keyPair!!) val instigatorTx = node1.services.startProtocol("instigator", instigator) - return Futures.transformAsync(Futures.allAsList(instigatorTx, acceptorTx)) { instigatorTx } + return Futures.allAsList(instigatorTx, acceptorTx).flatMap { instigatorTx } } override fun iterate(): InMemoryMessagingNetwork.MessageTransfer? { diff --git a/src/main/kotlin/com/r3corda/simulation/Simulation.kt b/src/main/kotlin/com/r3corda/simulation/Simulation.kt index a80a06b2ce..4864b69dd8 100644 --- a/src/main/kotlin/com/r3corda/simulation/Simulation.kt +++ b/src/main/kotlin/com/r3corda/simulation/Simulation.kt @@ -4,6 +4,7 @@ import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.crypto.generateKeyPair +import com.r3corda.core.flatMap import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.CityDatabase import com.r3corda.core.node.PhysicalLocation @@ -300,15 +301,13 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, } } - @Suppress("unused") // Used from the network visualiser tool. val networkInitialisationFinished: ListenableFuture<*> = Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }) fun start(): ListenableFuture { network.startNodes() // Wait for all the nodes to have finished registering with the network map service. - val startup: ListenableFuture> = Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }) - return Futures.transformAsync(startup) { l: List? -> startMainSimulation() } + return networkInitialisationFinished.flatMap { startMainSimulation() } } /** diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt b/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt index 1221ed603d..67ebd21bef 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/Expect.kt @@ -1,6 +1,6 @@ package com.r3corda.testing -import co.paralleluniverse.strands.SettableFuture +import com.google.common.util.concurrent.SettableFuture import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Observable @@ -131,7 +131,7 @@ fun S.genericExpectEvents( stream: S.((E) -> Unit) -> Unit, expectCompose: () -> ExpectCompose ) { - val finishFuture = SettableFuture() + val finishFuture = SettableFuture.create() /** * Internally we create a "lazy" state automaton. The outgoing edges are state.getExpectedEvents() modulo additional * matching logic. When an event comes we extract the first edge that matches using state.nextState(event), which diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt index 0ce6009694..a7f371c0ac 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt @@ -4,19 +4,17 @@ import com.google.common.jimfs.Jimfs import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.crypto.Party import com.r3corda.core.div import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.TopicSession -import com.r3corda.core.messaging.runOnNextMessage +import com.r3corda.core.messaging.onNext import com.r3corda.core.messaging.send import com.r3corda.core.node.PhysicalLocation import com.r3corda.core.node.services.KeyManagementService import com.r3corda.core.node.services.ServiceInfo import com.r3corda.core.node.services.VaultService import com.r3corda.core.random63BitValue -import com.r3corda.core.serialization.deserialize import com.r3corda.core.testing.InMemoryVaultService import com.r3corda.core.utilities.DUMMY_NOTARY_KEY import com.r3corda.core.utilities.loggerFor @@ -153,12 +151,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, services.networkService.send(TopicSession(topic), payload, target.info.address) } - inline fun receive(topic: String, sessionId: Long): ListenableFuture { - val receive = SettableFuture.create() - services.networkService.runOnNextMessage(topic, sessionId) { - receive.set(it.data.deserialize()) - } - return receive + fun receive(topic: String, sessionId: Long): ListenableFuture { + return services.networkService.onNext(topic, sessionId) } inline fun sendAndReceive(topic: String,