Merged in cor-133-serialization-as-tokens-2 (pull request #129)

Fix registration of services due to Kryo not following synthetic fields by default
This commit is contained in:
Rick Parker 2016-06-07 16:31:39 +01:00
commit f6e577f672
3 changed files with 13 additions and 9 deletions

View File

@ -25,7 +25,7 @@ import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
@ThreadSafe
class MockIdentityService(val identities: List<Party>) : IdentityService {
class MockIdentityService(val identities: List<Party>) : IdentityService, SingletonSerializeAsToken() {
private val keyToParties: Map<PublicKey, Party>
get() = synchronized(identities) { identities.associateBy { it.owningKey } }
private val nameToParties: Map<String, Party>

View File

@ -126,12 +126,12 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
storage = storageServices.first
checkpointStorage = storageServices.second
net = makeMessagingService()
smm = StateMachineManager(services, checkpointStorage, serverThread)
wallet = NodeWalletService(services)
keyManagement = E2ETestKeyManagementService()
makeInterestRatesOracleService()
api = APIServerImpl(this)
identity = makeIdentityService()
api = APIServerImpl(this)
smm = StateMachineManager(services, listOf(storage, net, wallet, keyManagement, identity, platformClock), checkpointStorage, serverThread)
// This object doesn't need to be referenced from this class because it registers handlers on the network
// service and so that keeps it from being collected.

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.google.common.base.Throwables
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.abbreviate
@ -49,7 +50,7 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
*/
@ThreadSafe
class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) {
class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableServices: List<Any>, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) {
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
val scheduler = FiberScheduler()
@ -70,7 +71,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
private val totalFinishedProtocols = metrics.counter("Protocols.Finished")
// Context for tokenized services in checkpoints
private val serializationContext = SerializeAsTokenContext(serviceHub)
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo())
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
fun <P : ProtocolLogic<T>, T> findStateMachines(protocolClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@ -130,13 +131,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
}
private fun deserializeFiber(serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>): ProtocolStateMachineImpl<*> {
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
val kryo = quasarKryo()
// put the map of token -> tokenized into the kryo context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
return serialisedFiber.deserialize(kryo)
}
private fun quasarKryo(): Kryo {
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
return createKryo(serializer.kryo)
}
private fun logError(e: Throwable, payload: Any?, topic: String?, psm: ProtocolStateMachineImpl<*>) {
psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
"when handling a message of type ${payload?.javaClass?.name} on topic $topic")
@ -214,8 +219,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor
// We have a request to do something: send, receive, or send-and-receive.
if (request is FiberRequest.ExpectingResponse<*>) {
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer
val kryo = createKryo(deserializer.kryo)
val kryo = quasarKryo()
// add the map of tokens -> tokenizedServices to the kyro context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
val serialisedFiber = fiber.serialize(kryo)