mirror of
https://github.com/corda/corda.git
synced 2025-06-13 20:58:19 +00:00
CORDA-3717: Apply custom serializers to checkpoints (#6392)
* CORDA-3717: Apply custom serializers to checkpoints * Remove try/catch to fix TooGenericExceptionCaught detekt rule * Rename exception * Extract method * Put calls to the userSerializer on their own lines to improve readability * Remove unused constructors from exception * Remove unused proxyType field * Give field a descriptive name * Explain why we are looking for two type parameters when we only use one * Tidy up the fetching of types * Use 0 seconds when forcing a flow checkpoint inside test * Add test to check references are restored correctly * Add CheckpointCustomSerializer interface * Wire up the new CheckpointCustomSerializer interface * Use kryo default for abstract classes * Remove unused imports * Remove need for external library in tests * Make file match original to remove from diff * Remove maySkipCheckpoint from calls to sleep * Add newline to end of file * Test custom serializers mapped to interfaces * Test serializer configured with abstract class * Move test into its own package * Rename test * Move flows and serializers into their own source file * Move broken map into its own source file * Delete comment now source file is simpler * Rename class to have a shorter name * Add tests that run the checkpoint serializer directly * Check serialization of final classes * Register as default unless the target class is final * Test PublicKey serializer has not been overridden * Add a broken serializer for EdDSAPublicKey to make test more robust * Split serializer registration into default and non-default registrations. Run registrations at the right time to preserve Cordas own custom serializers. * Check for duplicate custom checkpoint serializers * Add doc comments * Add doc comments to CustomSerializerCheckpointAdaptor * Add test to check duplicate serializers are logged * Do not log the duplicate serializer warning when the duplicate is the same class * Update doc comment for CheckpointCustomSerializer * Sort serializers by classname so we are not registering in an unknown or random order * Add test to serialize a class that references itself * Store custom serializer type in the Kryo stream so we can spot when a different serializer is being used to deserialize * Testing has shown that registering custom serializers as default is more robust when adding new cordapps * Remove new line character * Remove unused imports * Add interface net.corda.core.serialization.CheckpointCustomSerializer to api-current.txt * Remove comment * Update comment on exception * Make CustomSerializerCheckpointAdaptor internal * Revert "Add interface net.corda.core.serialization.CheckpointCustomSerializer to api-current.txt" This reverts commitb835de79bd
. * Restore "Add interface net.corda.core.serialization.CheckpointCustomSerializer to api-current.txt"" This reverts commit718873a4e9
. * Pass the class loader instead of the context * Do less work in test setup * Make the serialization context unique for CustomCheckpointSerializerTest so we get a new Kryo pool for the test * Rebuild the Kryo pool for the given context when we change custom serializers * Rebuild all Kryo pools on serializer change to keep serializer list consistent * Move the custom serializer list into CheckpointSerializationContext to reduce scope from global to a serialization context * Remove unused imports * Make the new checkpointCustomSerializers property default to the empty list * Delegate implementation using kotlin language feature
This commit is contained in:
committed by
GitHub
parent
a41152edf6
commit
c33720c73d
@ -0,0 +1,103 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import net.corda.core.serialization.CheckpointCustomSerializer
|
||||
import net.corda.serialization.internal.amqp.CORDAPP_TYPE
|
||||
import java.lang.reflect.Type
|
||||
import kotlin.reflect.jvm.javaType
|
||||
import kotlin.reflect.jvm.jvmErasure
|
||||
|
||||
/**
|
||||
* Adapts CheckpointCustomSerializer for use in Kryo
|
||||
*/
|
||||
internal class CustomSerializerCheckpointAdaptor<OBJ, PROXY>(private val userSerializer : CheckpointCustomSerializer<OBJ, PROXY>) : Serializer<OBJ>() {
|
||||
|
||||
/**
|
||||
* The class name of the serializer we are adapting.
|
||||
*/
|
||||
val serializerName: String = userSerializer.javaClass.name
|
||||
|
||||
/**
|
||||
* The input type of this custom serializer.
|
||||
*/
|
||||
val cordappType: Type
|
||||
|
||||
/**
|
||||
* Check we have access to the types specified on the CheckpointCustomSerializer interface.
|
||||
*
|
||||
* Throws UnableToDetermineSerializerTypesException if the types are missing.
|
||||
*/
|
||||
init {
|
||||
val types: List<Type> = userSerializer::class
|
||||
.supertypes
|
||||
.filter { it.jvmErasure == CheckpointCustomSerializer::class }
|
||||
.flatMap { it.arguments }
|
||||
.mapNotNull { it.type?.javaType }
|
||||
|
||||
// We are expecting a cordapp type and a proxy type.
|
||||
// We will only use the cordapp type in this class
|
||||
// but we want to check both are present.
|
||||
val typeParameterCount = 2
|
||||
if (types.size != typeParameterCount) {
|
||||
throw UnableToDetermineSerializerTypesException("Unable to determine serializer parent types")
|
||||
}
|
||||
cordappType = types[CORDAPP_TYPE]
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize obj to the Kryo stream.
|
||||
*/
|
||||
override fun write(kryo: Kryo, output: Output, obj: OBJ) {
|
||||
|
||||
fun <T> writeToKryo(obj: T) = kryo.writeClassAndObject(output, obj)
|
||||
|
||||
// Write serializer type
|
||||
writeToKryo(serializerName)
|
||||
|
||||
// Write proxy object
|
||||
writeToKryo(userSerializer.toProxy(obj))
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize an object from the Kryo stream.
|
||||
*/
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<OBJ>): OBJ {
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun <T> readFromKryo() = kryo.readClassAndObject(input) as T
|
||||
|
||||
// Check the serializer type
|
||||
checkSerializerType(readFromKryo())
|
||||
|
||||
// Read the proxy object
|
||||
return userSerializer.fromProxy(readFromKryo())
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws a `CustomCheckpointSerializersHaveChangedException` if the serializer type in the kryo stream does not match the serializer
|
||||
* type for this custom serializer.
|
||||
*
|
||||
* @param checkpointSerializerType Serializer type from the Kryo stream
|
||||
*/
|
||||
private fun checkSerializerType(checkpointSerializerType: String) {
|
||||
if (checkpointSerializerType != serializerName)
|
||||
throw CustomCheckpointSerializersHaveChangedException("The custom checkpoint serializers have changed while checkpoints exist. " +
|
||||
"Please restore the CorDapps to when this checkpoint was created.")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown when the input/output types are missing from the custom serializer.
|
||||
*/
|
||||
class UnableToDetermineSerializerTypesException(message: String) : RuntimeException(message)
|
||||
|
||||
/**
|
||||
* Thrown when the custom serializer is found to be reading data from another type of custom serializer.
|
||||
*
|
||||
* This was expected to happen if the user adds or removes CorDapps while checkpoints exist but it turned out that registering serializers
|
||||
* as default made the system reliable.
|
||||
*/
|
||||
class CustomCheckpointSerializersHaveChangedException(message: String) : RuntimeException(message)
|
@ -10,12 +10,14 @@ import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import com.esotericsoftware.kryo.serializers.ClosureSerializer
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.serialization.CheckpointCustomSerializer
|
||||
import net.corda.core.serialization.ClassWhitelist
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||
import net.corda.core.serialization.internal.CheckpointSerializer
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.serialization.internal.AlwaysAcceptEncodingWhitelist
|
||||
import net.corda.serialization.internal.ByteBufferInputStream
|
||||
import net.corda.serialization.internal.CheckpointSerializationContextImpl
|
||||
@ -40,10 +42,10 @@ private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>()
|
||||
}
|
||||
|
||||
object KryoCheckpointSerializer : CheckpointSerializer {
|
||||
private val kryoPoolsForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, KryoPool>()
|
||||
private val kryoPoolsForContexts = ConcurrentHashMap<Triple<ClassWhitelist, ClassLoader, Iterable<CheckpointCustomSerializer<*,*>>>, KryoPool>()
|
||||
|
||||
private fun getPool(context: CheckpointSerializationContext): KryoPool {
|
||||
return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
||||
return kryoPoolsForContexts.computeIfAbsent(Triple(context.whitelist, context.deserializationClassLoader, context.checkpointCustomSerializers)) {
|
||||
KryoPool.Builder {
|
||||
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
|
||||
val classResolver = CordaClassResolver(context).apply { setKryo(serializer.kryo) }
|
||||
@ -56,12 +58,60 @@ object KryoCheckpointSerializer : CheckpointSerializer {
|
||||
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
|
||||
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
|
||||
classLoader = it.second
|
||||
|
||||
// Add custom serializers
|
||||
val customSerializers = buildCustomSerializerAdaptors(context)
|
||||
warnAboutDuplicateSerializers(customSerializers)
|
||||
val classToSerializer = mapInputClassToCustomSerializer(context.deserializationClassLoader, customSerializers)
|
||||
addDefaultCustomSerializers(this, classToSerializer)
|
||||
}
|
||||
}.build()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a sorted list of CustomSerializerCheckpointAdaptor based on the custom serializers inside context.
|
||||
*
|
||||
* The adaptors are sorted by serializerName which maps to javaClass.name for the serializer class
|
||||
*/
|
||||
private fun buildCustomSerializerAdaptors(context: CheckpointSerializationContext) =
|
||||
context.checkpointCustomSerializers.map { CustomSerializerCheckpointAdaptor(it) }.sortedBy { it.serializerName }
|
||||
|
||||
/**
|
||||
* Returns a list of pairs where the first element is the input class of the custom serializer and the second element is the
|
||||
* custom serializer.
|
||||
*/
|
||||
private fun mapInputClassToCustomSerializer(classLoader: ClassLoader, customSerializers: Iterable<CustomSerializerCheckpointAdaptor<*, *>>) =
|
||||
customSerializers.map { getInputClassForCustomSerializer(classLoader, it) to it }
|
||||
|
||||
/**
|
||||
* Returns the Class object for the serializers input type.
|
||||
*/
|
||||
private fun getInputClassForCustomSerializer(classLoader: ClassLoader, customSerializer: CustomSerializerCheckpointAdaptor<*, *>): Class<*> {
|
||||
val typeNameWithoutGenerics = customSerializer.cordappType.typeName.substringBefore('<')
|
||||
return classLoader.loadClass(typeNameWithoutGenerics)
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a warning if two or more custom serializers are found for the same input type.
|
||||
*/
|
||||
private fun warnAboutDuplicateSerializers(customSerializers: Iterable<CustomSerializerCheckpointAdaptor<*,*>>) =
|
||||
customSerializers
|
||||
.groupBy({ it.cordappType }, { it.serializerName })
|
||||
.filter { (_, serializerNames) -> serializerNames.distinct().size > 1 }
|
||||
.forEach { (inputType, serializerNames) -> loggerFor<KryoCheckpointSerializer>().warn("Duplicate custom checkpoint serializer for type $inputType. Serializers: ${serializerNames.joinToString(", ")}") }
|
||||
|
||||
/**
|
||||
* Register all custom serializers as default, this class + subclass, registrations.
|
||||
*
|
||||
* Serializers registered before this will take priority. This needs to run after registrations we want to keep otherwise it may
|
||||
* replace them.
|
||||
*/
|
||||
private fun addDefaultCustomSerializers(kryo: Kryo, classToSerializer: Iterable<Pair<Class<*>, CustomSerializerCheckpointAdaptor<*, *>>>) =
|
||||
classToSerializer
|
||||
.forEach { (clazz, customSerializer) -> kryo.addDefaultSerializer(clazz, customSerializer) }
|
||||
|
||||
private fun <T : Any> CheckpointSerializationContext.kryo(task: Kryo.() -> T): T {
|
||||
return getPool(this).run { kryo ->
|
||||
kryo.context.ensureCapacity(properties.size)
|
||||
|
Reference in New Issue
Block a user