Merged in plt-141-moves-only-part-2 (pull request #92)

File moves to core module and split of interfaces and implementations in preparation for further moves
This commit is contained in:
Rick Parker 2016-05-13 13:22:08 +01:00
commit 3ee601360e
11 changed files with 106 additions and 81 deletions

View File

@ -3,8 +3,8 @@ package core.node
import core.*
import core.crypto.SecureHash
import core.messaging.MessagingService
import core.node.services.IdentityService
import core.node.subsystems.*
import core.node.services.*
import core.utilities.RecordingMap
import java.time.Clock
@ -12,6 +12,8 @@ import java.time.Clock
* A service hub simply vends references to the other services a node has. Some of those services may be missing or
* mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of
* functionality and you don't want to hard-code which types in the interface.
*
* TODO: Split into a public (to contracts etc) and private (to node) view
*/
interface ServiceHub {
val walletService: WalletService

View File

@ -0,0 +1,43 @@
package core.node.storage
import core.crypto.sha256
import core.protocols.ProtocolStateMachine
import core.serialization.SerializedBytes
/**
* Thread-safe storage of fiber checkpoints.
*
* TODO: Make internal to node again once split [ServiceHub] into a public (to contracts etc) and private (to node) view
*/
interface CheckpointStorage {
/**
* Add a new checkpoint to the store.
*/
fun addCheckpoint(checkpoint: Checkpoint)
/**
* Remove existing checkpoint from the store. It is an error to attempt to remove a checkpoint which doesn't exist
* in the store. Doing so will throw an [IllegalArgumentException].
*/
fun removeCheckpoint(checkpoint: Checkpoint)
/**
* Returns a snapshot of all the checkpoints in the store.
* This may return more checkpoints than were added to this instance of the store; for example if the store persists
* checkpoints to disk.
*/
val checkpoints: Iterable<Checkpoint>
}
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
data class Checkpoint(
val serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>,
val awaitingTopic: String,
val awaitingObjectOfType: String // java class name
) {
override fun toString(): String {
return "Checkpoint(#serialisedFiber=${serialisedFiber.sha256()}, awaitingTopic=$awaitingTopic, awaitingObjectOfType=$awaitingObjectOfType)"
}
}

View File

@ -4,7 +4,6 @@ import com.google.common.util.concurrent.ListenableFuture
import core.Contract
import core.Party
import core.messaging.MessagingService
import core.messaging.StateMachineManager
import core.node.NodeInfo
import core.node.services.ServiceType
import org.slf4j.LoggerFactory
@ -60,14 +59,13 @@ interface NetworkMapCache {
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
* updates.
*
* @param smm state machine manager to use when requesting
* @param net the network messaging service
* @param service the network map service to fetch current state from.
* @param subscribe if the cache should subscribe to updates
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
* version is less than or equal to the given version, no update is fetched.
*/
fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo,
fun addMapService(net: MessagingService, service: NodeInfo,
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
/**
@ -83,11 +81,10 @@ interface NetworkMapCache {
/**
* Deregister from updates from the given map service.
*
* @param smm state machine manager to use when requesting
* @param net the network messaging service
* @param service the network map service to fetch current state from.
*/
fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
}
sealed class NetworkCacheError : Exception() {

View File

@ -0,0 +1,26 @@
package core.protocols
import co.paralleluniverse.fibers.Suspendable
import core.messaging.MessageRecipients
import core.node.ServiceHub
import core.utilities.UntrustworthyData
import org.slf4j.Logger
/**
* The interface of [ProtocolStateMachineImpl] exposing methods and properties required by ProtocolLogic for compilation
*/
interface ProtocolStateMachine<R> {
@Suspendable
fun <T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long,
obj: Any, recvType: Class<T>): UntrustworthyData<T>
@Suspendable
fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T>
@Suspendable
fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any)
val serviceHub: ServiceHub
val logger: Logger
}

View File

@ -11,6 +11,7 @@ import core.node.ServiceHub
import core.node.storage.Checkpoint
import core.protocols.ProtocolLogic
import core.protocols.ProtocolStateMachine
import core.protocols.ProtocolStateMachineImpl
import core.serialization.SerializedBytes
import core.serialization.THREAD_LOCAL_KRYO
import core.serialization.createKryo
@ -62,7 +63,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
private val checkpointStorage = serviceHub.storageService.checkpointStorage
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private val stateMachines = synchronizedMap(HashMap<ProtocolStateMachine<*>, Checkpoint>())
private val stateMachines = synchronizedMap(HashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
// Monitoring support.
private val metrics = serviceHub.monitoringService.metrics
@ -82,7 +83,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
return stateMachines.keys
.map { it.logic }
.filterIsInstance(klass)
.map { it to (it.psm as ProtocolStateMachine<T>).resultFuture }
.map { it to (it.psm as ProtocolStateMachineImpl<T>).resultFuture }
}
}
@ -95,7 +96,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
init {
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as ProtocolStateMachine<*>).logger.error("Caught exception from protocol", throwable)
(fiber as ProtocolStateMachineImpl<*>).logger.error("Caught exception from protocol", throwable)
}
restoreCheckpoints()
}
@ -130,13 +131,13 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
}
}
private fun deserializeFiber(serialisedFiber: SerializedBytes<ProtocolStateMachine<*>>): ProtocolStateMachine<*> {
private fun deserializeFiber(serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>): ProtocolStateMachineImpl<*> {
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
return serialisedFiber.deserialize(kryo)
return serialisedFiber.deserialize(kryo) as ProtocolStateMachineImpl<*>
}
private fun logError(e: Throwable, obj: Any, topic: String, psm: ProtocolStateMachine<*>) {
private fun logError(e: Throwable, obj: Any, topic: String, psm: ProtocolStateMachineImpl<*>) {
psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
"when handling a message of type ${obj.javaClass.name} on topic $topic")
if (psm.logger.isTraceEnabled) {
@ -146,7 +147,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
}
}
private fun initFiber(psm: ProtocolStateMachine<*>, checkpoint: Checkpoint?) {
private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
stateMachines[psm] = checkpoint
psm.resultFuture.then(executor) {
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
@ -165,7 +166,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
*/
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
try {
val fiber = ProtocolStateMachine(logic, scheduler, loggerName)
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
initFiber(fiber, null)
executor.executeASAP {
@ -181,7 +182,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
}
}
private fun replaceCheckpoint(psm: ProtocolStateMachine<*>, newCheckpoint: Checkpoint) {
private fun replaceCheckpoint(psm: ProtocolStateMachineImpl<*>, newCheckpoint: Checkpoint) {
// It's OK for this to be unsynchronised, as the prev/new byte arrays are specific to a continuation instance,
// and the underlying map provided by the database layer is expected to be thread safe.
val previousCheckpoint = stateMachines.put(psm, newCheckpoint)
@ -192,11 +193,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
checkpointingMeter.mark()
}
private fun iterateStateMachine(psm: ProtocolStateMachine<*>,
private fun iterateStateMachine(psm: ProtocolStateMachineImpl<*>,
obj: Any?,
resumeFunc: (ProtocolStateMachine<*>) -> Unit) {
resumeFunc: (ProtocolStateMachineImpl<*>) -> Unit) {
executor.checkOnThread()
val onSuspend = fun(request: FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachine<*>>) {
val onSuspend = fun(request: FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
// We have a request to do something: send, receive, or send-and-receive.
if (request is FiberRequest.ExpectingResponse<*>) {
// Prepare a listener on the network that runs in the background thread when we received a message.
@ -225,9 +226,9 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
resumeFunc(psm)
}
private fun checkpointAndSetupMessageHandler(psm: ProtocolStateMachine<*>,
private fun checkpointAndSetupMessageHandler(psm: ProtocolStateMachineImpl<*>,
request: FiberRequest.ExpectingResponse<*>,
serialisedFiber: SerializedBytes<ProtocolStateMachine<*>>) {
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
executor.checkOnThread()
val topic = "${request.topic}.${request.sessionIDForReceive}"
val newCheckpoint = Checkpoint(serialisedFiber, topic, request.responseType.name)

View File

@ -129,11 +129,11 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// TODO: Return a future so the caller knows these operations may not have completed yet, and can monitor
// if needed
updateRegistration(initialNetworkMapAddress, AddOrRemove.ADD)
services.networkMapCache.addMapService(this.smm, net, initialNetworkMapAddress, true, null)
services.networkMapCache.addMapService(net, initialNetworkMapAddress, true, null)
}
if (inNodeNetworkMapService != null) {
// Register for updates
services.networkMapCache.addMapService(this.smm, net, info, true, null)
services.networkMapCache.addMapService(net, info, true, null)
}
}

View File

@ -1,7 +1,5 @@
package core.node.storage
import core.crypto.sha256
import core.protocols.ProtocolStateMachine
import core.serialization.SerializedBytes
import core.serialization.deserialize
import core.serialization.serialize
@ -9,36 +7,10 @@ import core.utilities.loggerFor
import core.utilities.trace
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption.ATOMIC_MOVE
import java.nio.file.StandardCopyOption
import java.util.*
import java.util.Collections.synchronizedMap
import javax.annotation.concurrent.ThreadSafe
/**
* Thread-safe storage of fiber checkpoints.
*/
interface CheckpointStorage {
/**
* Add a new checkpoint to the store.
*/
fun addCheckpoint(checkpoint: Checkpoint)
/**
* Remove existing checkpoint from the store. It is an error to attempt to remove a checkpoint which doesn't exist
* in the store. Doing so will throw an [IllegalArgumentException].
*/
fun removeCheckpoint(checkpoint: Checkpoint)
/**
* Returns a snapshot of all the checkpoints in the store.
* This may return more checkpoints than were added to this instance of the store; for example if the store persists
* checkpoints to disk.
*/
val checkpoints: Iterable<Checkpoint>
}
/**
* File-based checkpoint storage, storing checkpoints per file.
@ -51,7 +23,7 @@ class PerFileCheckpointStorage(val storeDir: Path) : CheckpointStorage {
private val fileExtension = ".checkpoint"
}
private val checkpointFiles = synchronizedMap(IdentityHashMap<Checkpoint, Path>())
private val checkpointFiles = Collections.synchronizedMap(IdentityHashMap<Checkpoint, Path>())
init {
logger.trace { "Initialising per file checkpoint storage on $storeDir" }
@ -76,7 +48,7 @@ class PerFileCheckpointStorage(val storeDir: Path) : CheckpointStorage {
private fun atomicWrite(checkpointFile: Path, serialisedCheckpoint: SerializedBytes<Checkpoint>) {
val tempCheckpointFile = checkpointFile.parent.resolve("${checkpointFile.fileName}.tmp")
serialisedCheckpoint.writeToFile(tempCheckpointFile)
Files.move(tempCheckpointFile, checkpointFile, ATOMIC_MOVE)
Files.move(tempCheckpointFile, checkpointFile, StandardCopyOption.ATOMIC_MOVE)
}
override fun removeCheckpoint(checkpoint: Checkpoint) {
@ -92,18 +64,3 @@ class PerFileCheckpointStorage(val storeDir: Path) : CheckpointStorage {
}
}
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
data class Checkpoint(
val serialisedFiber: SerializedBytes<ProtocolStateMachine<*>>,
val awaitingTopic: String,
val awaitingObjectOfType: String // java class name
)
{
override fun toString(): String {
return "Checkpoint(#serialisedFiber=${serialisedFiber.sha256()}, awaitingTopic=$awaitingTopic, awaitingObjectOfType=$awaitingObjectOfType)"
}
}

View File

@ -7,7 +7,6 @@ import core.Contract
import core.Party
import core.crypto.SecureHash
import core.messaging.MessagingService
import core.messaging.StateMachineManager
import core.messaging.runOnNextMessage
import core.messaging.send
import core.node.NodeInfo
@ -43,7 +42,7 @@ open class InMemoryNetworkMapCache() : NetworkMapCache {
override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value }
override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull()
override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean,
override fun addMapService(net: MessagingService, service: NodeInfo, subscribe: Boolean,
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
@ -95,7 +94,7 @@ open class InMemoryNetworkMapCache() : NetworkMapCache {
*
* @param service the network map service to listen to updates from.
*/
override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
// Fetch the network map and register for updates at the same time
val sessionID = random63BitValue()
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID)

View File

@ -24,15 +24,15 @@ import org.slf4j.LoggerFactory
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
* logic element gets to return the value that the entire state machine resolves to.
*/
class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, val loggerName: String) : Fiber<R>("protocol", scheduler) {
class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, val loggerName: String) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachine<*>>) -> Unit)? = null
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) -> Unit)? = null
@Transient private var resumeWithObject: Any? = null
@Transient lateinit var serviceHub: ServiceHub
@Transient lateinit override var serviceHub: ServiceHub
@Transient private var _logger: Logger? = null
val logger: Logger get() {
override val logger: Logger get() {
return _logger ?: run {
val l = LoggerFactory.getLogger(loggerName)
_logger = l
@ -56,7 +56,7 @@ class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>, scheduler: FiberSched
fun prepareForResumeWith(serviceHub: ServiceHub,
withObject: Any?,
suspendAction: (StateMachineManager.FiberRequest, SerializedBytes<ProtocolStateMachine<*>>) -> Unit) {
suspendAction: (StateMachineManager.FiberRequest, SerializedBytes<ProtocolStateMachineImpl<*>>) -> Unit) {
this.suspendAction = suspendAction
this.resumeWithObject = withObject
this.serviceHub = serviceHub
@ -84,20 +84,20 @@ class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>, scheduler: FiberSched
}
@Suspendable @Suppress("UNCHECKED_CAST")
fun <T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long,
obj: Any, recvType: Class<T>): UntrustworthyData<T> {
override fun <T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long,
obj: Any, recvType: Class<T>): UntrustworthyData<T> {
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, obj, recvType)
return suspendAndExpectReceive(result)
}
@Suspendable
fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T> {
override fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T> {
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, null, -1, sessionIDForReceive, null, recvType)
return suspendAndExpectReceive(result)
}
@Suspendable
fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
override fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
val result = StateMachineManager.FiberRequest.NotExpectingResponse(topic, destination, sessionID, obj)
suspend(result)
}

View File

@ -16,7 +16,7 @@ class InMemoryNetworkMapCacheTest {
fun registerWithNetwork() {
val (n0, n1) = network.createTwoNodes()
val future = n1.services.networkMapCache.addMapService(n1.smm, n1.net, n0.info, false, null)
val future = n1.services.networkMapCache.addMapService(n1.net, n0.info, false, null)
network.runNetwork()
future.get()
}