Have ServiceHub entries implement SerializeAsToken so they are not copied into protocol checkpoints.

This commit is contained in:
rick.parker
2016-05-24 08:59:33 +01:00
parent d8940ca88c
commit 8122e35a8a
21 changed files with 254 additions and 140 deletions

View File

@ -7,6 +7,7 @@ import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.api.APIServer
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.servlets.AttachmentDownloadServlet
@ -52,7 +53,7 @@ class ConfigurationException(message: String) : Exception(message)
*/
class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration,
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>,
clock: Clock = Clock.systemUTC(),
clock: Clock = NodeClock(),
val clientAPIs: List<Class<*>> = listOf()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
companion object {
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */

View File

@ -12,6 +12,7 @@ import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService
@ -21,7 +22,6 @@ import org.slf4j.Logger
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPair
import java.time.Clock
import java.util.*
/**
@ -66,7 +66,7 @@ class MockNetwork(private val threadPerNode: Boolean = false,
}
open class MockNode(dir: Path, config: NodeConfiguration, val mockNet: MockNetwork, networkMapAddr: NodeInfo?,
advertisedServices: Set<ServiceType>, val id: Int, val keyPair: KeyPair?) : AbstractNode(dir, config, networkMapAddr, advertisedServices, Clock.systemUTC()) {
advertisedServices: Set<ServiceType>, val id: Int, val keyPair: KeyPair?) : AbstractNode(dir, config, networkMapAddr, advertisedServices, NodeClock()) {
override val log: Logger = loggerFor<MockNode>()
override val serverThread: AffinityExecutor =
if (mockNet.threadPerNode)

View File

@ -0,0 +1,35 @@
package com.r3corda.node.serialization
import com.r3corda.core.serialization.SerializeAsToken
import com.r3corda.core.serialization.SerializeAsTokenContext
import com.r3corda.core.serialization.SingletonSerializationToken
import java.time.Clock
import java.time.Instant
import java.time.ZoneId
import javax.annotation.concurrent.ThreadSafe
/**
* A [Clock] that tokenizes itself when serialized, and delegates to an underlying [Clock] implementation.
*/
@ThreadSafe
class NodeClock(private val delegateClock: Clock = Clock.systemUTC()) : Clock(), SerializeAsToken {
private val token = SingletonSerializationToken(this)
override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context)
override fun instant(): Instant {
return delegateClock.instant()
}
// Do not use this. Instead seek to use ZonedDateTime methods.
override fun withZone(zone: ZoneId): Clock {
throw UnsupportedOperationException("Tokenized clock does not support withZone()")
}
override fun getZone(): ZoneId {
return delegateClock.zone
}
}

View File

@ -1,10 +1,11 @@
package com.r3corda.node.services.api
import com.codahale.metrics.MetricRegistry
import com.r3corda.core.serialization.SingletonSerializeAsToken
/**
* Provides access to various metrics and ways to notify monitoring services of things, for sysadmin purposes.
* This is not an interface because it is too lightweight to bother mocking out.
*/
class MonitoringService(val metrics: MetricRegistry)
class MonitoringService(val metrics: MetricRegistry) : SingletonSerializeAsToken()

View File

@ -2,6 +2,7 @@ package com.r3corda.node.services.identity
import com.r3corda.core.crypto.Party
import com.r3corda.core.node.services.IdentityService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import java.security.PublicKey
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
@ -10,7 +11,7 @@ import javax.annotation.concurrent.ThreadSafe
* Simple identity service which caches parties and provides functionality for efficient lookup.
*/
@ThreadSafe
class InMemoryIdentityService() : IdentityService {
class InMemoryIdentityService() : SingletonSerializeAsToken(), IdentityService {
private val keyToParties = ConcurrentHashMap<PublicKey, Party>()
private val nameToParties = ConcurrentHashMap<String, Party>()

View File

@ -3,6 +3,7 @@ package com.r3corda.node.services.keys
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.node.services.KeyManagementService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
@ -21,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe
* etc
*/
@ThreadSafe
class E2ETestKeyManagementService : KeyManagementService {
class E2ETestKeyManagementService() : SingletonSerializeAsToken(), KeyManagementService {
private class InnerState {
val keys = HashMap<PublicKey, PrivateKey>()
}

View File

@ -4,8 +4,9 @@ import com.google.common.net.HostAndPort
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.ThreadBox
import com.r3corda.core.messaging.*
import com.r3corda.node.internal.Node
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.Node
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.*
@ -52,7 +53,7 @@ import javax.annotation.concurrent.ThreadSafe
*/
@ThreadSafe
class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
val defaultExecutor: Executor = RunOnCallerThread) : MessagingService {
val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService {
// In future: can contain onion routing info, etc.
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient

View File

@ -6,6 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.sha256
import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import org.slf4j.LoggerFactory
@ -28,7 +29,7 @@ import kotlin.concurrent.thread
* testing).
*/
@ThreadSafe
class InMemoryMessagingNetwork {
class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
companion object {
val MESSAGES_LOG_NAME = "messages"
private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME)
@ -167,7 +168,7 @@ class InMemoryMessagingNetwork {
* An instance can be obtained by creating a builder and then using the start method.
*/
@ThreadSafe
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : MessagingService {
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingService {
inner class Handler(val executor: Executor?, val topic: String,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration

View File

@ -15,6 +15,7 @@ import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.node.services.api.RegulatorService
@ -30,7 +31,7 @@ import javax.annotation.concurrent.ThreadSafe
* Extremely simple in-memory cache of the network map.
*/
@ThreadSafe
open class InMemoryNetworkMapCache() : NetworkMapCache {
open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCache {
override val networkMapNodes: List<NodeInfo>
get() = get(NetworkMapService.Type)
override val regulators: List<NodeInfo>

View File

@ -1,15 +1,8 @@
/*
* Copyright 2016 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package com.r3corda.node.services.network
import co.paralleluniverse.common.util.VisibleForTesting
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.DummyPublicKey
import com.r3corda.core.crypto.Party
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo

View File

@ -1,10 +1,11 @@
package com.r3corda.node.services.persistence
import com.r3corda.core.crypto.Party
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.AttachmentStorage
import com.r3corda.core.node.services.StorageService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.RecordingMap
import org.slf4j.LoggerFactory
import java.security.KeyPair
@ -15,7 +16,7 @@ open class StorageServiceImpl(override val attachments: AttachmentStorage,
override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public),
// This parameter is for unit tests that want to observe operation details.
val recordingAs: (String) -> String = { tableName -> "" })
: StorageService {
: SingletonSerializeAsToken(), StorageService {
protected val tables = HashMap<String, MutableMap<*, *>>()
private fun <K, V> getMapOriginal(tableName: String): MutableMap<K, V> {

View File

@ -3,17 +3,11 @@ package com.r3corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolStateMachine
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.createKryo
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.UntrustworthyData
import com.r3corda.node.services.api.ServiceHubInternal
import org.slf4j.Logger
@ -30,7 +24,7 @@ import org.slf4j.LoggerFactory
class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, val loggerName: String) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) -> Unit)? = null
@Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, fiber: ProtocolStateMachineImpl<*>) -> Unit)? = null
@Transient private var resumeWithObject: Any? = null
@Transient lateinit override var serviceHub: ServiceHubInternal
@ -59,7 +53,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
fun prepareForResumeWith(serviceHub: ServiceHubInternal,
withObject: Any?,
suspendAction: (StateMachineManager.FiberRequest, SerializedBytes<ProtocolStateMachineImpl<*>>) -> Unit) {
suspendAction: (StateMachineManager.FiberRequest, ProtocolStateMachineImpl<*>) -> Unit) {
this.suspendAction = suspendAction
this.resumeWithObject = withObject
this.serviceHub = serviceHub
@ -108,10 +102,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>, scheduler: FiberS
@Suspendable
private fun suspend(with: StateMachineManager.FiberRequest) {
parkAndSerialize { fiber, serializer ->
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val deserializer = getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
suspendAction!!(with, this.serialize(kryo))
suspendAction!!(with, this)
}
}

View File

@ -12,10 +12,7 @@ import com.r3corda.core.messaging.runOnNextMessage
import com.r3corda.core.messaging.send
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolStateMachine
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.THREAD_LOCAL_KRYO
import com.r3corda.core.serialization.createKryo
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.*
import com.r3corda.core.then
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.core.utilities.trace
@ -51,8 +48,6 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
* TODO: Make Kryo (de)serialize markers for heavy objects that are currently in the service hub. This avoids mistakes
* where services are temporarily put on the stack.
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
*/
@ThreadSafe
@ -76,6 +71,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
private val totalStartedProtocols = metrics.counter("Protocols.Started")
private val totalFinishedProtocols = metrics.counter("Protocols.Finished")
// Context for tokenized services in checkpoints
private val serializationContext = SerializeAsTokenContext(serviceHub)
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
fun <T> findStateMachines(klass: Class<out ProtocolLogic<T>>): List<Pair<ProtocolLogic<T>, ListenableFuture<T>>> {
synchronized(stateMachines) {
@ -139,6 +137,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
private fun deserializeFiber(serialisedFiber: SerializedBytes<out ProtocolStateMachine<*>>): ProtocolStateMachineImpl<*> {
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
// put the map of token -> tokenized into the kryo context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
return serialisedFiber.deserialize(kryo) as ProtocolStateMachineImpl<*>
}
@ -202,9 +202,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
obj: Any?,
resumeFunc: (ProtocolStateMachineImpl<*>) -> Unit) {
executor.checkOnThread()
val onSuspend = fun(request: FiberRequest, serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
val onSuspend = fun(request: FiberRequest, fiber: ProtocolStateMachineImpl<*>) {
// We have a request to do something: send, receive, or send-and-receive.
if (request is FiberRequest.ExpectingResponse<*>) {
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
// add the map of tokens -> tokenizedServices to the kyro context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
val serialisedFiber = fiber.serialize(kryo)
// Prepare a listener on the network that runs in the background thread when we received a message.
checkpointAndSetupMessageHandler(psm, request, serialisedFiber)
}

View File

@ -8,6 +8,7 @@ import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.Wallet
import com.r3corda.core.node.services.WalletService
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.ServiceHubInternal
@ -21,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe
* states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed.
*/
@ThreadSafe
class NodeWalletService(private val services: ServiceHubInternal) : WalletService {
class NodeWalletService(private val services: ServiceHubInternal) : SingletonSerializeAsToken(), WalletService {
private val log = loggerFor<NodeWalletService>()
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're

View File

@ -5,6 +5,7 @@ import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MonitoringService
@ -40,7 +41,7 @@ class MockServices(
val storage: StorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapService: NetworkMapService? = null,
val overrideClock: Clock? = Clock.systemUTC()
val overrideClock: Clock? = NodeClock()
) : ServiceHubInternal {
override val walletService: WalletService = customWallet ?: NodeWalletService(this)