Merged in R3-CEV/internal/simulation-changes (pull request #137)

Simulation changes
This commit is contained in:
Andras Slemmer 2016-06-13 18:10:01 +01:00
commit 71cbea46e3
4 changed files with 26 additions and 9 deletions

View File

@ -1,6 +1,8 @@
package com.r3corda.node.internal
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.JdkFutureAdapters
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.RunOnCallerThread
@ -47,6 +49,7 @@ import java.security.KeyPair
import java.time.Clock
import java.time.Instant
import java.util.*
import java.util.concurrent.CompletableFuture
/**
* A base node implementation that can be customised either for production (with real implementations that do real
@ -109,9 +112,10 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
var isPreviousCheckpointsPresent = false
private set
/** Completes once the node has successfully registered with the network map service. Null until [start] returns. */
@Volatile var networkMapRegistrationFuture: ListenableFuture<Unit>? = null
private set
/** Completes once the node has successfully registered with the network map service */
private val _networkMapRegistrationFuture: SettableFuture<Unit> = SettableFuture.create()
val networkMapRegistrationFuture: ListenableFuture<Unit>
get() = _networkMapRegistrationFuture
/** Set to true once [start] has been successfully called. */
@Volatile var started = false
@ -145,7 +149,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
CashBalanceAsMetricsObserver(services)
startMessagingService()
networkMapRegistrationFuture = registerWithNetworkMap()
_networkMapRegistrationFuture.setFuture(registerWithNetworkMap())
isPreviousCheckpointsPresent = checkpointStorage.checkpoints.any()
smm.start()
started = true

View File

@ -1,5 +1,6 @@
package com.r3corda.node.internal.testing
import com.google.common.base.Function
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.node.CityDatabase
@ -213,6 +214,9 @@ abstract class Simulation(val runAsync: Boolean,
}
}
val networkInitialisationFinished: ListenableFuture<*> =
Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture })
fun start(): ListenableFuture<Unit> {
network.startNodes()
// Wait for all the nodes to have finished registering with the network map service.
@ -246,4 +250,4 @@ abstract class Simulation(val runAsync: Boolean,
}
next(0, 1)
}
}
}

View File

@ -81,7 +81,8 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
private val _allMessages = PublishSubject.create<MessageTransfer>()
/** A stream of (sender, message, recipients) triples */
val allMessages: Observable<MessageTransfer> = _allMessages
val allMessages: Observable<MessageTransfer>
get() = _allMessages
interface LatencyCalculator {
fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration
@ -104,7 +105,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
} else {
msgSendInternal(transfer)
}
_allMessages.onNext(MessageTransfer(from, message, recipients))
}
private fun msgSendInternal(transfer: MessageTransfer) {
@ -256,7 +256,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
private fun pumpInternal(block: Boolean): MessageTransfer? {
val q = getQueueForHandle(handle)
val transfer = (if (block) q.take() else q.poll()) ?: return null
val deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
@ -277,6 +276,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
(handler.executor ?: MoreExecutors.directExecutor()).execute {
try {
_allMessages.onNext(transfer)
handler.callback(transfer.message, handler)
} catch(e: Exception) {
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)

View File

@ -6,7 +6,9 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.google.common.base.Throwables
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.abbreviate
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.runOnNextMessage
@ -62,6 +64,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
// property.
private val stateMachines = synchronizedMap(LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
// A map from fibers to futures that will be completed when the last corresponding checkpoint is removed
private val finalCheckpointRemovedFutures = synchronizedMap(HashMap<ProtocolStateMachineImpl<*>, SettableFuture<Unit>>())
// Monitoring support.
private val metrics = serviceHub.monitoringService.metrics
@ -178,12 +183,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
stateMachines[psm] = checkpoint
notifyChangeObservers(psm, AddOrRemove.ADD)
val finalCheckpointRemovedFuture: SettableFuture<Unit> = SettableFuture.create()
finalCheckpointRemovedFutures[psm] = finalCheckpointRemovedFuture
psm.resultFuture.then(executor) {
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
val finalCheckpoint = stateMachines.remove(psm)
if (finalCheckpoint != null) {
checkpointStorage.removeCheckpoint(finalCheckpoint)
}
finalCheckpointRemovedFuture.set(Unit)
totalFinishedProtocols.inc()
notifyChangeObservers(psm, AddOrRemove.REMOVE)
}
@ -206,7 +214,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
}
totalStartedProtocols.inc()
}
return fiber.resultFuture
val finalCheckpointRemovedFuture = finalCheckpointRemovedFutures.remove(fiber)
return Futures.transformAsync(finalCheckpointRemovedFuture, { fiber.resultFuture })
} catch(e: Throwable) {
e.printStackTrace()
throw e