CORDA-3717: Apply custom serializers to checkpoints

This commit is contained in:
Joseph Zuniga-Daly 2020-06-19 11:18:06 +01:00
parent 6485a025c7
commit a66e140185
6 changed files with 147 additions and 4 deletions

View File

@ -0,0 +1,54 @@
package net.corda.nodeapi.internal.serialization.kryo
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.serialization.internal.amqp.CORDAPP_TYPE
import net.corda.serialization.internal.amqp.PROXY_TYPE
import java.lang.reflect.Type
import kotlin.reflect.jvm.javaType
import kotlin.reflect.jvm.jvmErasure
class CustomSerializerCheckpointAdaptor<OBJ, PROXY>(private val userSerializer : SerializationCustomSerializer<OBJ, PROXY>) : Serializer<OBJ>() {
val type: Type
val proxyType: Type
init {
val types = userSerializer::class.supertypes.filter { it.jvmErasure == SerializationCustomSerializer::class }
.flatMap { it.arguments }
.map { it.type!!.javaType }
if (types.size != 2) {
throw CustomSerializerCheckpointAdaptorException("Unable to determine serializer parent types")
}
type = types[CORDAPP_TYPE]
proxyType = types[PROXY_TYPE]
}
override fun write(kryo: Kryo, output: Output, obj: OBJ) {
try {
kryo.writeClassAndObject(output, userSerializer.toProxy(obj))
} catch (e: Exception) {
throw CustomSerializerCheckpointAdaptorException("Failed converting ${type.typeName} to ${proxyType.typeName}", e)
}
}
override fun read(kryo: Kryo, input: Input, type: Class<OBJ>): OBJ {
try {
@Suppress("UNCHECKED_CAST")
return userSerializer.fromProxy(kryo.readClassAndObject(input) as PROXY)
} catch (e: Exception) {
throw CustomSerializerCheckpointAdaptorException("Failed converting ${proxyType.typeName} to ${this.type.typeName}", e)
}
}
}
class CustomSerializerCheckpointAdaptorException : java.lang.Exception {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(message, cause, enableSuppression, writableStackTrace)
}

View File

@ -11,6 +11,7 @@ import com.esotericsoftware.kryo.pool.KryoPool
import com.esotericsoftware.kryo.serializers.ClosureSerializer import com.esotericsoftware.kryo.serializers.ClosureSerializer
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -41,6 +42,7 @@ private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>()
object KryoCheckpointSerializer : CheckpointSerializer { object KryoCheckpointSerializer : CheckpointSerializer {
private val kryoPoolsForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, KryoPool>() private val kryoPoolsForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, KryoPool>()
private var cordappSerializers: List<CustomSerializerCheckpointAdaptor<out Any?, out Any?>> = listOf()
private fun getPool(context: CheckpointSerializationContext): KryoPool { private fun getPool(context: CheckpointSerializationContext): KryoPool {
return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) { return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
@ -51,6 +53,19 @@ object KryoCheckpointSerializer : CheckpointSerializer {
val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true } val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true }
serializer.kryo.apply { serializer.kryo.apply {
field.set(this, classResolver) field.set(this, classResolver)
for (customSerializer in cordappSerializers) {
val typeName = customSerializer.type.typeName.substringBefore('<')
val clazz = context.deserializationClassLoader.loadClass(typeName)
if (clazz.isInterface){
addDefaultSerializer(clazz, customSerializer)
} else {
register(clazz, customSerializer)
}
}
// don't allow overriding the public key serializer for checkpointing // don't allow overriding the public key serializer for checkpointing
DefaultKryoCustomizer.customize(this) DefaultKryoCustomizer.customize(this)
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector) addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
@ -120,6 +135,10 @@ object KryoCheckpointSerializer : CheckpointSerializer {
}) })
} }
} }
fun addCordappSerializers(customSerializers: Collection<SerializationCustomSerializer<*, *>>) {
cordappSerializers = customSerializers.map { CustomSerializerCheckpointAdaptor(it) }
}
} }
val KRYO_CHECKPOINT_CONTEXT = CheckpointSerializationContextImpl( val KRYO_CHECKPOINT_CONTEXT = CheckpointSerializationContextImpl(

View File

@ -191,7 +191,8 @@ dependencies {
// Integration test helpers // Integration test helpers
integrationTestCompile "junit:junit:$junit_version" integrationTestCompile "junit:junit:$junit_version"
integrationTestCompile "org.assertj:assertj-core:${assertj_version}" integrationTestCompile "org.assertj:assertj-core:${assertj_version}"
integrationTestCompile "com.github.andrewoma.dexx:kollection:0.7"
// BFT-Smart dependencies // BFT-Smart dependencies
compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87' compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87'
compile 'commons-codec:commons-codec:1.13' compile 'commons-codec:commons-codec:1.13'

View File

@ -0,0 +1,68 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import com.github.andrewoma.dexx.kollection.ImmutableMap
import com.github.andrewoma.dexx.kollection.immutableMapOf
import com.github.andrewoma.dexx.kollection.toImmutableMap
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Duration
class MockNetworkCustomSerializerCheckpointTest{
private lateinit var mockNetwork: MockNetwork
@Before
fun setup() {
mockNetwork = MockNetwork(MockNetworkParameters(cordappsForAllNodes = listOf(enclosedCordapp())))
}
@After
fun shutdown() {
mockNetwork.stopNodes()
}
@Test(timeout = 300_000)
fun `flow suspend with custom kryo serializer`() {
val node = mockNetwork.createPartyNode()
val expected = 5
val actual = node.startFlow(TestFlow(5)).get()
Assertions.assertThat(actual).isEqualTo(expected)
}
@StartableByRPC
class TestFlow(private val purchase: Int) : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
// This object is difficult to serialize with Kryo
val difficultToSerialize: ImmutableMap<String, Int> = immutableMapOf("foo" to purchase)
// Force a checkpoint
sleep(Duration.ofSeconds(10), maySkipCheckpoint = false)
// Return value from deserialized object
return difficultToSerialize["foo"] ?: 0
}
}
@Suppress("unused")
class TestSerializer : SerializationCustomSerializer<ImmutableMap<String, Int>, HashMap<String, Int>> {
override fun toProxy(obj: ImmutableMap<String, Int>): HashMap<String, Int> {
val proxy = HashMap<String, Int>()
return obj.toMap(proxy)
}
override fun fromProxy(proxy: HashMap<String, Int>): ImmutableMap<String, Int> {
return proxy.toImmutableMap()
}
}
}

View File

@ -643,7 +643,7 @@ open class Node(configuration: NodeConfiguration,
rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null, //even Shell embeded in the node connects via RPC to the node rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null, //even Shell embeded in the node connects via RPC to the node
storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader), storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader),
checkpointSerializer = KryoCheckpointSerializer, checkpointSerializer = KryoCheckpointSerializer.also { checkpointSerializer -> checkpointSerializer.addCordappSerializers(cordappLoader.cordapps.flatMap { it.serializationCustomSerializers }) },
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader) checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader)
) )
} }

View File

@ -25,8 +25,9 @@ fun createTestSerializationEnv(): SerializationEnvironment {
} }
fun createTestSerializationEnv(classLoader: ClassLoader?): SerializationEnvironment { fun createTestSerializationEnv(classLoader: ClassLoader?): SerializationEnvironment {
var customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet()
val (clientSerializationScheme, serverSerializationScheme) = if (classLoader != null) { val (clientSerializationScheme, serverSerializationScheme) = if (classLoader != null) {
val customSerializers = createInstancesOfClassesImplementing(classLoader, SerializationCustomSerializer::class.java) customSerializers = createInstancesOfClassesImplementing(classLoader, SerializationCustomSerializer::class.java)
val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, classLoader).toSet() val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, classLoader).toSet()
Pair(AMQPClientSerializationScheme(customSerializers, serializationWhitelists), Pair(AMQPClientSerializationScheme(customSerializers, serializationWhitelists),
@ -45,7 +46,7 @@ fun createTestSerializationEnv(classLoader: ClassLoader?): SerializationEnvironm
AMQP_RPC_CLIENT_CONTEXT, AMQP_RPC_CLIENT_CONTEXT,
AMQP_STORAGE_CONTEXT, AMQP_STORAGE_CONTEXT,
KRYO_CHECKPOINT_CONTEXT, KRYO_CHECKPOINT_CONTEXT,
KryoCheckpointSerializer KryoCheckpointSerializer.also { it.addCordappSerializers(customSerializers) }
) )
} }