Follow refactoring CHECKPOINT_CONTEXT from SerializationDefaults to CheckpointSerializationDefaults etc in Enterprise only code.

This commit is contained in:
szymonsztuka 2018-09-20 17:09:23 +01:00
parent 1c681ad29a
commit cd8b802fcb
2 changed files with 17 additions and 13 deletions

View File

@ -19,6 +19,10 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
@ -35,7 +39,7 @@ import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.injectOldProgressTracker
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.serialization.internal.SerializeAsTokenContextImpl
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
import net.corda.serialization.internal.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch
import rx.Observable
@ -110,7 +114,7 @@ class MultiThreadedStateMachineManager(
private val transitionExecutor = makeTransitionExecutor()
private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID
private var checkpointSerializationContext: SerializationContext? = null
private var checkpointSerializationContext: CheckpointSerializationContext? = null
private var tokenizableServices: List<Any>? = null
private var actionExecutor: ActionExecutor? = null
@ -134,8 +138,8 @@ class MultiThreadedStateMachineManager(
override fun start(tokenizableServices: List<Any>) {
checkQuasarJavaAgentPresence()
this.tokenizableServices = tokenizableServices
val checkpointSerializationContext = SerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
SerializeAsTokenContextImpl(tokenizableServices, SerializationDefaults.SERIALIZATION_FACTORY, SerializationDefaults.CHECKPOINT_CONTEXT, serviceHub)
val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
CheckpointSerializeAsTokenContextImpl(tokenizableServices, CheckpointSerializationDefaults.CHECKPOINT_SERIALIZATION_FACTORY, CheckpointSerializationDefaults.CHECKPOINT_CONTEXT, serviceHub)
)
this.checkpointSerializationContext = checkpointSerializationContext
this.actionExecutor = makeActionExecutor(checkpointSerializationContext)
@ -535,7 +539,7 @@ class MultiThreadedStateMachineManager(
val resultFuture = openFuture<Any?>()
flowStateMachineImpl.transientValues = TransientReference(createTransientValues(flowId, resultFuture))
flowLogic.stateMachine = flowStateMachineImpl
val frozenFlowLogic = (flowLogic as FlowLogic<*>).serialize(context = checkpointSerializationContext!!)
val frozenFlowLogic = (flowLogic as FlowLogic<*>).checkpointSerialize(context = checkpointSerializationContext!!)
val flowCorDappVersion = FlowStateMachineImpl.createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, flowCorDappVersion).getOrThrow()
@ -616,7 +620,7 @@ class MultiThreadedStateMachineManager(
private fun deserializeCheckpoint(serializedCheckpoint: SerializedBytes<Checkpoint>): Checkpoint? {
return try {
serializedCheckpoint.deserialize(context = checkpointSerializationContext!!)
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext!!)
} catch (exception: Throwable) {
logger.error("Encountered unrestorable checkpoint!", exception)
null
@ -661,7 +665,7 @@ class MultiThreadedStateMachineManager(
val resultFuture = openFuture<Any?>()
val fiber = when (flowState) {
is FlowState.Unstarted -> {
val logic = flowState.frozenFlowLogic.deserialize(context = checkpointSerializationContext!!)
val logic = flowState.frozenFlowLogic.checkpointDeserialize(context = checkpointSerializationContext!!)
val state = StateMachineState(
checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
@ -680,7 +684,7 @@ class MultiThreadedStateMachineManager(
fiber
}
is FlowState.Started -> {
val fiber = flowState.frozenFiber.deserialize(context = checkpointSerializationContext!!)
val fiber = flowState.frozenFiber.checkpointDeserialize(context = checkpointSerializationContext!!)
val state = StateMachineState(
checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
@ -740,7 +744,7 @@ class MultiThreadedStateMachineManager(
}
}
private fun makeActionExecutor(checkpointSerializationContext: SerializationContext): ActionExecutor {
private fun makeActionExecutor(checkpointSerializationContext: CheckpointSerializationContext): ActionExecutor {
return ActionExecutorImpl(
serviceHub,
checkpointStorage,

View File

@ -14,9 +14,9 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.node.TestClock
import org.junit.Assert
@ -70,7 +70,7 @@ class FlowStateMachineComparatorTest {
val sm1 = FlowStateMachineImpl<Unit>(StateMachineRunId(UUID.randomUUID()),
scheduler = scheduler,
logic = EmptyFlow, creationTime = clock.millis())
val sm2 = sm1.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT).deserialize(context = SerializationDefaults.CHECKPOINT_CONTEXT)
val sm2 = sm1.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).checkpointDeserialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
Fiber.unparkDeserialized(sm2, scheduler)
val comparator = FlowStateMachineComparator()