mirror of
https://github.com/corda/corda.git
synced 2025-06-12 20:28:18 +00:00
ENT-5043 separate out test utils (#5998)
* Remove unused dependencies from test-common * Explicit imports and formatting * Add core-test-utils project * Add dependency * Move Kryo serialization context to node-api (not serialization as we do not want to pull kryo into the serialization lib) * Move AMQP server serialization scheme to node api * Move serialization tests to node-api * Move internal test helpers without further dependencies. * Move out some types from RPCClientProxyHandler to node-api in preparation for moving the AMQP scheme * Move client AMQP context to node-api so we can move the test serialization rule out. * Move InternalSerializationTestHelpers to core-test-utils * Moved testing.core to core-test-utils * Make detekt happy * Add api-scanner to core-test-utils * Remove inlined package names introduced by IntelliJ refactoring * Update api-current.txt to account for reordering. * Add core-test-utils to list of published artifacts. * Add missing import * Location of things in api text has moved again (publish name of artefact?) * Revert all additions to the API, leaving just the reordering * Code review: fix up core-test-utils build.gradle and introduce kryo version constant. * Remove OpenSsl flag from ssl config stub (can't be used from node-api) * Suppress detekt warning * Move core test util tests to the right module * Expose kotlin test as a transient dependency - projects have come to rely on that. * Fix typo in package name
This commit is contained in:
@ -0,0 +1,21 @@
|
||||
package net.corda.nodeapi.internal.rpc
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/**
|
||||
* An observable context is constructed on each RPC request. If subsequently a nested Observable is encountered this
|
||||
* same context is propagated by the serialization context. This way all observations rooted in a single RPC will be
|
||||
* muxed correctly. Note that the context construction itself is quite cheap.
|
||||
*/
|
||||
interface ObservableContextInterface {
|
||||
fun sendMessage(serverToClient: RPCApi.ServerToClient)
|
||||
|
||||
val observableMap: Cache<Trace.InvocationId, ObservableSubscription>
|
||||
val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<Trace.InvocationId>>
|
||||
val deduplicationIdentity: String
|
||||
val clientAddress: SimpleString
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
package net.corda.nodeapi.internal.rpc
|
||||
|
||||
import rx.Subscription
|
||||
|
||||
class ObservableSubscription(
|
||||
val subscription: Subscription
|
||||
)
|
@ -0,0 +1,84 @@
|
||||
package net.corda.nodeapi.internal.rpc.client
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.internal.toSynchronised
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationContext.UseCase
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.serialization.internal.SerializationEnvironment
|
||||
import net.corda.core.serialization.internal._rpcClientSerializationEnv
|
||||
import net.corda.serialization.internal.AMQP_P2P_CONTEXT
|
||||
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
|
||||
import net.corda.serialization.internal.AMQP_RPC_SERVER_CONTEXT
|
||||
import net.corda.serialization.internal.AMQP_STORAGE_CONTEXT
|
||||
import net.corda.serialization.internal.CordaSerializationMagic
|
||||
import net.corda.serialization.internal.SerializationFactoryImpl
|
||||
import net.corda.serialization.internal.amqp.AbstractAMQPSerializationScheme
|
||||
import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap
|
||||
import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
|
||||
import net.corda.serialization.internal.amqp.SerializerFactory
|
||||
import net.corda.serialization.internal.amqp.SerializerFactoryBuilder
|
||||
import net.corda.serialization.internal.amqp.amqpMagic
|
||||
import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
|
||||
|
||||
/**
|
||||
* When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation.
|
||||
* This scheme is for use by the RPC Client calls.
|
||||
*/
|
||||
class AMQPClientSerializationScheme(
|
||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
|
||||
cordappSerializationWhitelists: Set<SerializationWhitelist>,
|
||||
serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>
|
||||
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, cordappSerializationWhitelists, serializerFactoriesForContexts) {
|
||||
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, cordapps.serializationWhitelists)
|
||||
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>)
|
||||
: this(cordapps.customSerializers, cordapps.serializationWhitelists, serializerFactoriesForContexts)
|
||||
constructor(
|
||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
|
||||
cordappSerializationWhitelists: Set<SerializationWhitelist>
|
||||
) : this(cordappCustomSerializers, cordappSerializationWhitelists, createDefaultSerializerFactoryCache())
|
||||
|
||||
@Suppress("UNUSED")
|
||||
constructor() : this(emptySet(), emptySet())
|
||||
|
||||
companion object {
|
||||
/** Call from main only. */
|
||||
fun initialiseSerialization(classLoader: ClassLoader? = null, customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet(), serializationWhitelists: Set<SerializationWhitelist> = emptySet(), serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()) {
|
||||
_rpcClientSerializationEnv.set(createSerializationEnv(classLoader, customSerializers,
|
||||
serializationWhitelists, serializerFactoriesForContexts))
|
||||
}
|
||||
|
||||
fun createSerializationEnv(classLoader: ClassLoader? = null, customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet(), serializationWhitelists: Set<SerializationWhitelist> = emptySet(), serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()): SerializationEnvironment {
|
||||
return SerializationEnvironment.with(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(AMQPClientSerializationScheme(customSerializers, serializationWhitelists, serializerFactoriesForContexts))
|
||||
},
|
||||
storageContext = AMQP_STORAGE_CONTEXT,
|
||||
p2pContext = if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT,
|
||||
rpcClientContext = AMQP_RPC_CLIENT_CONTEXT,
|
||||
rpcServerContext = AMQP_RPC_SERVER_CONTEXT
|
||||
)
|
||||
}
|
||||
|
||||
private fun createDefaultSerializerFactoryCache(): MutableMap<SerializationFactoryCacheKey, SerializerFactory> {
|
||||
return AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()
|
||||
}
|
||||
}
|
||||
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return magic == amqpMagic && (target == UseCase.RPCClient || target == UseCase.P2P)
|
||||
}
|
||||
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
return SerializerFactoryBuilder.build(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply {
|
||||
register(RpcClientObservableDeSerializer)
|
||||
register(RpcClientCordaFutureSerializer(this))
|
||||
register(RxNotificationSerializer(this))
|
||||
}
|
||||
}
|
||||
|
||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package net.corda.nodeapi.internal.rpc.client
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import net.corda.core.context.Trace
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/** A throwable that doesn't represent a real error - it's just here to wrap a stack trace. */
|
||||
class CallSite(val rpcName: String) : Throwable("<Call site of root RPC '$rpcName'>")
|
||||
|
||||
typealias RpcObservableMap = Cache<Trace.InvocationId, UnicastSubject<Notification<*>>>
|
||||
typealias CallSiteMap = ConcurrentHashMap<Trace.InvocationId, CallSite?>
|
||||
|
||||
/**
|
||||
* Holds a context available during de-serialisation of messages that are expected to contain Observables.
|
||||
*
|
||||
* @property observableMap holds the Observables that are ultimately exposed to the user.
|
||||
* @property hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.
|
||||
* @property callSiteMap keeps stack traces captured when an RPC was invoked, useful for debugging when an observable leaks.
|
||||
*/
|
||||
data class ObservableContext(
|
||||
val callSiteMap: CallSiteMap?,
|
||||
val observableMap: RpcObservableMap,
|
||||
val hardReferenceStore: MutableSet<Observable<*>>
|
||||
)
|
@ -0,0 +1,34 @@
|
||||
package net.corda.nodeapi.internal.rpc.client
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.serialization.internal.amqp.CustomSerializer
|
||||
import net.corda.serialization.internal.amqp.SerializerFactory
|
||||
import rx.Observable
|
||||
import java.io.NotSerializableException
|
||||
|
||||
/**
|
||||
* Serializer for [CordaFuture] instances that can only deserialize such objects (just as the server
|
||||
* side can only serialize them). Futures will have been converted to an Rx [Observable] for serialization.
|
||||
*/
|
||||
class RpcClientCordaFutureSerializer (factory: SerializerFactory)
|
||||
: CustomSerializer.Proxy<CordaFuture<*>, RpcClientCordaFutureSerializer.FutureProxy>(
|
||||
CordaFuture::class.java,
|
||||
FutureProxy::class.java, factory
|
||||
) {
|
||||
override fun fromProxy(proxy: FutureProxy): CordaFuture<*> {
|
||||
try {
|
||||
return proxy.observable.toFuture()
|
||||
} catch (e: NotSerializableException) {
|
||||
throw NotSerializableException("Failed to deserialize Future from proxy Observable - ${e.message}\n").apply {
|
||||
initCause(e.cause)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun toProxy(obj: CordaFuture<*>): FutureProxy {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
data class FutureProxy(val observable: Observable<*>)
|
||||
}
|
@ -0,0 +1,130 @@
|
||||
package net.corda.nodeapi.internal.rpc.client
|
||||
|
||||
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.serialization.internal.amqp.*
|
||||
import org.apache.qpid.proton.codec.Data
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.io.NotSerializableException
|
||||
import java.lang.reflect.Type
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.transaction.NotSupportedException
|
||||
|
||||
/**
|
||||
* De-serializer for Rx [Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
|
||||
* just as the corresponding RPC server side class [RpcServerObservableSerializer] can only serialize them. Observables
|
||||
* are only notionally serialized, what is actually sent is a reference to the observable that can then be subscribed to.
|
||||
*/
|
||||
object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
|
||||
private val log = loggerFor<RpcClientObservableDeSerializer>()
|
||||
private object RpcObservableContextKey
|
||||
|
||||
fun createContext(
|
||||
serializationContext: SerializationContext,
|
||||
observableContext: ObservableContext
|
||||
) = serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
||||
|
||||
private fun <T> pinInSubscriptions(observable: Observable<T>, hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
|
||||
val refCount = AtomicInteger(0)
|
||||
return observable.doOnSubscribe {
|
||||
if (refCount.getAndIncrement() == 0) {
|
||||
require(hardReferenceStore.add(observable)) {
|
||||
"Reference store already contained reference $this on add"
|
||||
}
|
||||
}
|
||||
}.doOnUnsubscribe {
|
||||
if (refCount.decrementAndGet() == 0) {
|
||||
require(hardReferenceStore.remove(observable)) {
|
||||
"Reference store did not contain reference $this on remove"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val schemaForDocumentation = Schema(
|
||||
listOf(
|
||||
CompositeType(
|
||||
name = type.toString(),
|
||||
label = "",
|
||||
provides = emptyList(),
|
||||
descriptor = descriptor,
|
||||
fields = listOf(
|
||||
Field(
|
||||
name = "observableId",
|
||||
type = "string",
|
||||
requires = emptyList(),
|
||||
default = null,
|
||||
label = null,
|
||||
mandatory = true,
|
||||
multiple = false),
|
||||
Field(
|
||||
name = "observableInstant",
|
||||
type = "long",
|
||||
requires = emptyList(),
|
||||
default = null,
|
||||
label = null,
|
||||
mandatory = true,
|
||||
multiple = false)
|
||||
))))
|
||||
|
||||
/**
|
||||
* Converts the serialized form, a blob, back into an Observable
|
||||
*/
|
||||
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput,
|
||||
context: SerializationContext
|
||||
): Observable<*> {
|
||||
if (RpcObservableContextKey !in context.properties) {
|
||||
throw NotSerializableException("Missing Observable Context Key on Client Context")
|
||||
}
|
||||
|
||||
val observableContext =
|
||||
context.properties[RpcObservableContextKey] as ObservableContext
|
||||
|
||||
if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list")
|
||||
if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}")
|
||||
|
||||
val observableId: Trace.InvocationId = Trace.InvocationId((obj[0] as String), Instant.ofEpochMilli((obj[1] as Long)))
|
||||
val observable = UnicastSubject.create<Notification<*>>()
|
||||
|
||||
require(observableContext.observableMap.getIfPresent(observableId) == null) {
|
||||
"Multiple Observables arrived with the same ID $observableId"
|
||||
}
|
||||
|
||||
val rpcCallSite = getRpcCallSite(context, observableContext)
|
||||
observableContext.observableMap.put(observableId, observable)
|
||||
observableContext.callSiteMap?.put(observableId, rpcCallSite)
|
||||
log.trace("Deserialising observable $observableId", rpcCallSite)
|
||||
|
||||
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
|
||||
// don't need to store a reference to the Observables themselves.
|
||||
return pinInSubscriptions(observable, observableContext.hardReferenceStore)
|
||||
.doOnUnsubscribe {
|
||||
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
|
||||
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
|
||||
// The unsubscribe is due to ObservableToFuture's use of first().
|
||||
observableContext.observableMap.invalidate(observableId)
|
||||
}.dematerialize<Any>()
|
||||
}
|
||||
|
||||
private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): CallSite? {
|
||||
val rpcRequestOrObservableId = context.properties[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId
|
||||
// Will only return non-null if the trackRpcCallSites option in the RPC configuration has been specified.
|
||||
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
|
||||
}
|
||||
|
||||
override fun writeDescribedObject(
|
||||
obj: Observable<*>,
|
||||
data: Data,
|
||||
type: Type,
|
||||
output: SerializationOutput,
|
||||
context: SerializationContext
|
||||
) {
|
||||
throw NotSupportedException()
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.internal.toSynchronised
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.serialization.internal.CordaSerializationMagic
|
||||
import net.corda.serialization.internal.amqp.*
|
||||
import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
|
||||
|
||||
/**
|
||||
* When set as the serialization scheme, defines the RPC Server serialization scheme as using the Corda
|
||||
* AMQP implementation.
|
||||
*/
|
||||
class AMQPServerSerializationScheme(
|
||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
|
||||
cordappSerializationWhitelists: Set<SerializationWhitelist>,
|
||||
serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>
|
||||
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, cordappSerializationWhitelists, serializerFactoriesForContexts) {
|
||||
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, cordapps.serializationWhitelists, AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
|
||||
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>) : this(cordapps.customSerializers, cordapps.serializationWhitelists, serializerFactoriesForContexts)
|
||||
|
||||
constructor() : this(emptySet(), emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised() )
|
||||
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
return SerializerFactoryBuilder.build(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply {
|
||||
register(RpcServerObservableSerializer())
|
||||
register(RpcServerCordaFutureSerializer(this))
|
||||
register(RxNotificationSerializer(this))
|
||||
}
|
||||
}
|
||||
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return canDeserializeVersion(magic) &&
|
||||
( target == SerializationContext.UseCase.P2P
|
||||
|| target == SerializationContext.UseCase.Storage
|
||||
|| target == SerializationContext.UseCase.RPCServer)
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.toObservable
|
||||
import net.corda.serialization.internal.amqp.CustomSerializer
|
||||
import net.corda.serialization.internal.amqp.SerializerFactory
|
||||
import rx.Observable
|
||||
import java.io.NotSerializableException
|
||||
|
||||
/**
|
||||
* Serializer for [CordaFuture] objects where Futures are converted to Observables and
|
||||
* are thus dealt with by the [RpcServerObservableSerializer]
|
||||
*/
|
||||
class RpcServerCordaFutureSerializer(factory: SerializerFactory)
|
||||
: CustomSerializer.Proxy<CordaFuture<*>,
|
||||
RpcServerCordaFutureSerializer.FutureProxy>(
|
||||
CordaFuture::class.java, FutureProxy::class.java, factory
|
||||
) {
|
||||
override fun fromProxy(proxy: FutureProxy): CordaFuture<*> {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
override fun toProxy(obj: CordaFuture<*>): FutureProxy {
|
||||
try {
|
||||
return FutureProxy(obj.toObservable())
|
||||
} catch (e: NotSerializableException) {
|
||||
throw (NotSerializableException("Failed to serialize Future as proxy Observable - ${e.message}"))
|
||||
}
|
||||
}
|
||||
|
||||
data class FutureProxy(val observable: Observable<*>)
|
||||
}
|
||||
|
||||
|
||||
|
@ -0,0 +1,143 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.rpc.ObservableContextInterface
|
||||
import net.corda.nodeapi.internal.rpc.ObservableSubscription
|
||||
import net.corda.serialization.internal.amqp.*
|
||||
import org.apache.qpid.proton.codec.Data
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import java.io.NotSerializableException
|
||||
import java.lang.reflect.Type
|
||||
|
||||
/**
|
||||
* Server side serializer that notionally serializes RxObservables when used by the RPC
|
||||
* framework for event subscriptions. Notional in the sense that the actual observable
|
||||
* isn't serialized, rather a reference to the observable is, this is then used by
|
||||
* the client side RPC handler to subscribe to the observable stream.
|
||||
*/
|
||||
class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>(
|
||||
Observable::class.java
|
||||
) {
|
||||
// Would be great to make this private, but then it's so much harder to unit test
|
||||
object RpcObservableContextKey
|
||||
|
||||
companion object {
|
||||
fun createContext(
|
||||
serializationContext: SerializationContext,
|
||||
observableContext: ObservableContextInterface
|
||||
) = serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
||||
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
||||
override val schemaForDocumentation = Schema(
|
||||
listOf(
|
||||
CompositeType(
|
||||
name = type.toString(),
|
||||
label = "",
|
||||
provides = emptyList(),
|
||||
descriptor = descriptor,
|
||||
fields = listOf(
|
||||
Field(
|
||||
name = "observableId",
|
||||
type = "string",
|
||||
requires = emptyList(),
|
||||
default = null,
|
||||
label = null,
|
||||
mandatory = true,
|
||||
multiple = false),
|
||||
Field(
|
||||
name = "observableInstant",
|
||||
type = "long",
|
||||
requires = emptyList(),
|
||||
default = null,
|
||||
label = null,
|
||||
mandatory = true,
|
||||
multiple = false)
|
||||
))))
|
||||
|
||||
override fun readObject(
|
||||
obj: Any, schemas: SerializationSchemas,
|
||||
input: DeserializationInput,
|
||||
context: SerializationContext
|
||||
): Observable<*> {
|
||||
// Note: this type of server Serializer is never meant to read postings arriving from clients.
|
||||
// I.e. Observables cannot be used as parameters for RPC methods and can only be used as return values.
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
override fun writeDescribedObject(
|
||||
obj: Observable<*>,
|
||||
data: Data,
|
||||
type: Type,
|
||||
output: SerializationOutput,
|
||||
context: SerializationContext
|
||||
) {
|
||||
val observableId = Trace.InvocationId.newInstance()
|
||||
if (RpcObservableContextKey !in context.properties) {
|
||||
throw NotSerializableException("Missing Observable Key on serialization context - $type")
|
||||
}
|
||||
|
||||
val observableContext = context.properties[RpcObservableContextKey]
|
||||
as ObservableContextInterface
|
||||
|
||||
data.withList {
|
||||
data.putString(observableId.value)
|
||||
data.putLong(observableId.timestamp.toEpochMilli())
|
||||
}
|
||||
|
||||
val observableWithSubscription = ObservableSubscription(
|
||||
subscription = obj.materialize().subscribe(
|
||||
object : Subscriber<Notification<*>>() {
|
||||
override fun onNext(observation: Notification<*>) {
|
||||
if (!isUnsubscribed) {
|
||||
val message = RPCApi.ServerToClient.Observation(
|
||||
id = observableId,
|
||||
content = observation,
|
||||
deduplicationIdentity = observableContext.deduplicationIdentity
|
||||
)
|
||||
observableContext.sendMessage(message)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onError(exception: Throwable) {
|
||||
loggerFor<RpcServerObservableSerializer>().error(
|
||||
"onError called in materialize()d RPC Observable", exception)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables != null) {
|
||||
observables.remove(observableId)
|
||||
if (observables.isEmpty()) {
|
||||
null
|
||||
} else {
|
||||
observables
|
||||
}
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables == null) {
|
||||
hashSetOf(observableId)
|
||||
} else {
|
||||
observables.add(observableId)
|
||||
observables
|
||||
}
|
||||
}
|
||||
observableContext.observableMap.put(observableId, observableWithSubscription)
|
||||
log.trace("Serialized observable $observableId of type $obj")
|
||||
}
|
||||
}
|
@ -0,0 +1,179 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.*
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.serializers.FieldSerializer
|
||||
import com.esotericsoftware.kryo.util.DefaultClassResolver
|
||||
import com.esotericsoftware.kryo.util.Util
|
||||
import net.corda.core.internal.kotlinObjectInstance
|
||||
import net.corda.core.internal.writer
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||
import net.corda.core.serialization.ClassWhitelist
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.serialization.internal.AttachmentsClassLoader
|
||||
import net.corda.serialization.internal.MutableClassWhitelist
|
||||
import net.corda.serialization.internal.TransientClassWhiteList
|
||||
import net.corda.serialization.internal.amqp.hasCordaSerializable
|
||||
import java.io.PrintWriter
|
||||
import java.lang.reflect.Modifier.isAbstract
|
||||
import java.nio.charset.StandardCharsets.UTF_8
|
||||
import java.nio.file.Paths
|
||||
import java.nio.file.StandardOpenOption.*
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Corda specific class resolver which enables extra customisation for the purposes of serialization using Kryo
|
||||
*/
|
||||
class CordaClassResolver(serializationContext: CheckpointSerializationContext) : DefaultClassResolver() {
|
||||
val whitelist: ClassWhitelist = TransientClassWhiteList(serializationContext.whitelist)
|
||||
|
||||
// These classes are assignment-compatible Java equivalents of Kotlin classes.
|
||||
// The point is that we do not want to send Kotlin types "over the wire" via RPC.
|
||||
private val javaAliases: Map<Class<*>, Class<*>> = mapOf(
|
||||
listOf<Any>().javaClass to Collections.emptyList<Any>().javaClass,
|
||||
setOf<Any>().javaClass to Collections.emptySet<Any>().javaClass,
|
||||
mapOf<Any, Any>().javaClass to Collections.emptyMap<Any, Any>().javaClass
|
||||
)
|
||||
|
||||
private fun typeForSerializationOf(type: Class<*>): Class<*> = javaAliases[type] ?: type
|
||||
|
||||
/** Returns the registration for the specified class, or null if the class is not registered. */
|
||||
override fun getRegistration(type: Class<*>): Registration? {
|
||||
val targetType = typeForSerializationOf(type)
|
||||
return super.getRegistration(targetType) ?: checkClass(targetType)
|
||||
}
|
||||
|
||||
private var whitelistEnabled = true
|
||||
|
||||
fun disableWhitelist() {
|
||||
whitelistEnabled = false
|
||||
}
|
||||
|
||||
fun enableWhitelist() {
|
||||
whitelistEnabled = true
|
||||
}
|
||||
|
||||
private fun checkClass(type: Class<*>): Registration? {
|
||||
// If call path has disabled whitelisting (see [CordaKryo.register]), just return without checking.
|
||||
if (!whitelistEnabled) return null
|
||||
// If array, recurse on element type
|
||||
if (type.isArray) return checkClass(type.componentType)
|
||||
// Specialised enum entry, so just resolve the parent Enum type since cannot annotate the specialised entry.
|
||||
if (!type.isEnum && Enum::class.java.isAssignableFrom(type)) return checkClass(type.superclass)
|
||||
// Allow primitives, abstracts and interfaces. Note that we can also create abstract Enum types,
|
||||
// but we don't want to whitelist those here.
|
||||
if (type.isPrimitive || type == Any::class.java || type == String::class.java || (!type.isEnum && isAbstract(type.modifiers))) return null
|
||||
// It's safe to have the Class already, since Kryo loads it with initialisation off.
|
||||
// If we use a whitelist with blacklisting capabilities, whitelist.hasListed(type) may throw an IllegalStateException if input class is blacklisted.
|
||||
// Thus, blacklisting precedes annotation checking.
|
||||
if (!whitelist.hasListed(type) && !checkForAnnotation(type)) {
|
||||
throw KryoException("Class ${Util.className(type)} is not annotated or on the whitelist, so cannot be used in serialization")
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
override fun registerImplicit(type: Class<*>): Registration {
|
||||
val targetType = typeForSerializationOf(type)
|
||||
val objectInstance = targetType.kotlinObjectInstance
|
||||
|
||||
// We have to set reference to true, since the flag influences how String fields are treated and we want it to be consistent.
|
||||
val references = kryo.references
|
||||
try {
|
||||
kryo.references = true
|
||||
val serializer = when {
|
||||
objectInstance != null -> KotlinObjectSerializer(objectInstance)
|
||||
kotlin.jvm.internal.Lambda::class.java.isAssignableFrom(targetType) -> // Kotlin lambdas extend this class and any captured variables are stored in synthetic fields
|
||||
FieldSerializer<Any>(kryo, targetType).apply { setIgnoreSyntheticFields(false) }
|
||||
Throwable::class.java.isAssignableFrom(targetType) -> ThrowableSerializer(kryo, targetType)
|
||||
else -> kryo.getDefaultSerializer(targetType)
|
||||
}
|
||||
return register(Registration(targetType, serializer, NAME.toInt()))
|
||||
} finally {
|
||||
kryo.references = references
|
||||
}
|
||||
}
|
||||
|
||||
override fun writeName(output: Output, type: Class<*>, registration: Registration) {
|
||||
super.writeName(output, registration.type ?: type, registration)
|
||||
}
|
||||
|
||||
// Trivial Serializer which simply returns the given instance, which we already know is a Kotlin object
|
||||
private class KotlinObjectSerializer(private val objectInstance: Any) : Serializer<Any>() {
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Any>): Any = objectInstance
|
||||
override fun write(kryo: Kryo, output: Output, obj: Any) = Unit
|
||||
}
|
||||
|
||||
// We don't allow the annotation for classes in attachments for now. The class will be on the main classpath if we have the CorDapp installed.
|
||||
// We also do not allow extension of KryoSerializable for annotated classes, or combination with @DefaultSerializer for custom serialisation.
|
||||
// TODO: Later we can support annotations on attachment classes and spin up a proxy via bytecode that we know is harmless.
|
||||
private fun checkForAnnotation(type: Class<*>): Boolean {
|
||||
return (type.classLoader !is AttachmentsClassLoader)
|
||||
&& !KryoSerializable::class.java.isAssignableFrom(type)
|
||||
&& !type.isAnnotationPresent(DefaultSerializer::class.java)
|
||||
&& hasCordaSerializable(type)
|
||||
}
|
||||
|
||||
// Need to clear out class names from attachments.
|
||||
override fun reset() {
|
||||
super.reset()
|
||||
// Kryo creates a cache of class name to Class<*> which does not work so well with multiple class loaders.
|
||||
// TODO: come up with a more efficient way. e.g. segregate the name space by class loader.
|
||||
if (nameToClass != null) {
|
||||
val classesToRemove: MutableList<String> = ArrayList(nameToClass.size)
|
||||
nameToClass.entries()
|
||||
.filter { it.value.classLoader is AttachmentsClassLoader }
|
||||
.forEach { classesToRemove += it.key }
|
||||
for (className in classesToRemove) {
|
||||
nameToClass.remove(className)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is not currently used, but can be installed to log a large number of missing entries from the whitelist
|
||||
* and was used to track down the initial set.
|
||||
*/
|
||||
@Suppress("unused")
|
||||
class LoggingWhitelist(val delegate: ClassWhitelist, val global: Boolean = true) : MutableClassWhitelist {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
val globallySeen: MutableSet<String> = Collections.synchronizedSet(mutableSetOf())
|
||||
val journalWriter: PrintWriter? = openOptionalDynamicWhitelistJournal()
|
||||
|
||||
private fun openOptionalDynamicWhitelistJournal(): PrintWriter? {
|
||||
val fileName = System.getenv("WHITELIST_FILE")
|
||||
if (fileName != null && fileName.isNotEmpty()) {
|
||||
try {
|
||||
return PrintWriter(Paths.get(fileName).writer(UTF_8, CREATE, APPEND, WRITE), true)
|
||||
} catch (ioEx: Exception) {
|
||||
log.error("Could not open/create whitelist journal file for append: $fileName", ioEx)
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
private val locallySeen: MutableSet<String> = mutableSetOf()
|
||||
private val alreadySeen: MutableSet<String> get() = if (global) globallySeen else locallySeen
|
||||
|
||||
override fun hasListed(type: Class<*>): Boolean {
|
||||
if (type.name !in alreadySeen && !delegate.hasListed(type)) {
|
||||
alreadySeen += type.name
|
||||
val className = Util.className(type)
|
||||
log.warn("Dynamically whitelisted class $className")
|
||||
journalWriter?.println(className)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
override fun add(entry: Class<*>) {
|
||||
if (delegate is MutableClassWhitelist) {
|
||||
delegate.add(entry)
|
||||
} else {
|
||||
throw UnsupportedOperationException("Cannot add to whitelist since delegate whitelist is not mutable.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,29 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.serializers.ClosureSerializer
|
||||
import java.io.Serializable
|
||||
|
||||
object CordaClosureSerializer : ClosureSerializer() {
|
||||
const val ERROR_MESSAGE = "Unable to serialize Java Lambda expression, unless explicitly declared e.g., Runnable r = (Runnable & Serializable) () -> System.out.println(\"Hello world!\");"
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, target: Any) {
|
||||
if (!isSerializable(target)) {
|
||||
throw IllegalArgumentException(ERROR_MESSAGE)
|
||||
}
|
||||
super.write(kryo, output, target)
|
||||
}
|
||||
|
||||
private fun isSerializable(target: Any): Boolean {
|
||||
return target is Serializable
|
||||
}
|
||||
}
|
||||
|
||||
object CordaClosureBlacklistSerializer : ClosureSerializer() {
|
||||
const val ERROR_MESSAGE = "Java 8 Lambda expressions are not supported for serialization."
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, target: Any) {
|
||||
throw IllegalArgumentException(ERROR_MESSAGE)
|
||||
}
|
||||
}
|
@ -0,0 +1,249 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.serializers.ClosureSerializer
|
||||
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
|
||||
import com.esotericsoftware.kryo.serializers.FieldSerializer
|
||||
import de.javakaffee.kryoserializers.ArraysAsListSerializer
|
||||
import de.javakaffee.kryoserializers.BitSetSerializer
|
||||
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer
|
||||
import de.javakaffee.kryoserializers.guava.*
|
||||
import net.corda.core.contracts.ContractAttachment
|
||||
import net.corda.core.contracts.ContractClassName
|
||||
import net.corda.core.contracts.PrivacySalt
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.AbstractAttachment
|
||||
import net.corda.core.internal.LazyMappedList
|
||||
import net.corda.core.internal.readFully
|
||||
import net.corda.core.serialization.MissingAttachmentsException
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.transactions.*
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.serialization.internal.DefaultWhitelist
|
||||
import net.corda.serialization.internal.GeneratedAttachment
|
||||
import net.corda.serialization.internal.MutableClassWhitelist
|
||||
import net.i2p.crypto.eddsa.EdDSAPrivateKey
|
||||
import net.i2p.crypto.eddsa.EdDSAPublicKey
|
||||
import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPrivateKey
|
||||
import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPublicKey
|
||||
import org.bouncycastle.jcajce.provider.asymmetric.rsa.BCRSAPrivateCrtKey
|
||||
import org.bouncycastle.jcajce.provider.asymmetric.rsa.BCRSAPublicKey
|
||||
import org.bouncycastle.pqc.jcajce.provider.sphincs.BCSphincs256PrivateKey
|
||||
import org.bouncycastle.pqc.jcajce.provider.sphincs.BCSphincs256PublicKey
|
||||
import org.objenesis.instantiator.ObjectInstantiator
|
||||
import org.objenesis.strategy.InstantiatorStrategy
|
||||
import org.objenesis.strategy.StdInstantiatorStrategy
|
||||
import org.slf4j.Logger
|
||||
import java.io.BufferedInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.FileInputStream
|
||||
import java.io.InputStream
|
||||
import java.lang.reflect.Modifier.isPublic
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.X509Certificate
|
||||
import java.util.*
|
||||
import kotlin.collections.ArrayList
|
||||
|
||||
object DefaultKryoCustomizer {
|
||||
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
|
||||
ServiceLoader.load(SerializationWhitelist::class.java, this.javaClass.classLoader).toList() + DefaultWhitelist
|
||||
}
|
||||
|
||||
fun customize(kryo: Kryo, publicKeySerializer: Serializer<PublicKey> = PublicKeySerializer): Kryo {
|
||||
return kryo.apply {
|
||||
// Store a little schema of field names in the stream the first time a class is used which increases tolerance
|
||||
// for change to a class.
|
||||
setDefaultSerializer(CompatibleFieldSerializer::class.java)
|
||||
// Take the safest route here and allow subclasses to have fields named the same as super classes.
|
||||
fieldSerializerConfig.cachedFieldNameStrategy = FieldSerializer.CachedFieldNameStrategy.EXTENDED
|
||||
|
||||
instantiatorStrategy = CustomInstantiatorStrategy()
|
||||
|
||||
// Required for HashCheckingStream (de)serialization.
|
||||
// Note that return type should be specifically set to InputStream, otherwise it may not work, i.e. val aStream : InputStream = HashCheckingStream(...).
|
||||
addDefaultSerializer(InputStream::class.java, InputStreamSerializer)
|
||||
addDefaultSerializer(SerializeAsToken::class.java, SerializeAsTokenSerializer<SerializeAsToken>())
|
||||
addDefaultSerializer(Logger::class.java, LoggerSerializer)
|
||||
addDefaultSerializer(X509Certificate::class.java, X509CertificateSerializer)
|
||||
|
||||
// WARNING: reordering the registrations here will cause a change in the serialized form, since classes
|
||||
// with custom serializers get written as registration ids. This will break backwards-compatibility.
|
||||
// Please add any new registrations to the end.
|
||||
// TODO: re-organise registrations into logical groups before v1.0
|
||||
|
||||
register(Arrays.asList("").javaClass, ArraysAsListSerializer())
|
||||
register(LazyMappedList::class.java, LazyMappedListSerializer)
|
||||
register(SignedTransaction::class.java, SignedTransactionSerializer)
|
||||
register(WireTransaction::class.java, WireTransactionSerializer)
|
||||
register(SerializedBytes::class.java, SerializedBytesSerializer)
|
||||
UnmodifiableCollectionsSerializer.registerSerializers(this)
|
||||
ImmutableListSerializer.registerSerializers(this)
|
||||
ImmutableSetSerializer.registerSerializers(this)
|
||||
ImmutableSortedSetSerializer.registerSerializers(this)
|
||||
ImmutableMapSerializer.registerSerializers(this)
|
||||
ImmutableMultimapSerializer.registerSerializers(this)
|
||||
// InputStream subclasses whitelisting, required for attachments.
|
||||
register(BufferedInputStream::class.java, InputStreamSerializer)
|
||||
register(Class.forName("sun.net.www.protocol.jar.JarURLConnection\$JarURLInputStream"), InputStreamSerializer)
|
||||
noReferencesWithin<WireTransaction>()
|
||||
register(PublicKey::class.java, publicKeySerializer)
|
||||
register(PrivateKey::class.java, PrivateKeySerializer)
|
||||
register(EdDSAPublicKey::class.java, publicKeySerializer)
|
||||
register(EdDSAPrivateKey::class.java, PrivateKeySerializer)
|
||||
register(CompositeKey::class.java, publicKeySerializer) // Using a custom serializer for compactness
|
||||
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
|
||||
register(Array<StackTraceElement>::class, read = { _, _ -> emptyArray() }, write = { _, _, _ -> })
|
||||
// This ensures a NonEmptySetSerializer is constructed with an initial value.
|
||||
register(NonEmptySet::class.java, NonEmptySetSerializer)
|
||||
register(BitSet::class.java, BitSetSerializer())
|
||||
register(Class::class.java, ClassSerializer)
|
||||
register(FileInputStream::class.java, InputStreamSerializer)
|
||||
register(CertPath::class.java, CertPathSerializer)
|
||||
register(BCECPrivateKey::class.java, PrivateKeySerializer)
|
||||
register(BCECPublicKey::class.java, publicKeySerializer)
|
||||
register(BCRSAPrivateCrtKey::class.java, PrivateKeySerializer)
|
||||
register(BCRSAPublicKey::class.java, publicKeySerializer)
|
||||
register(BCSphincs256PrivateKey::class.java, PrivateKeySerializer)
|
||||
register(BCSphincs256PublicKey::class.java, publicKeySerializer)
|
||||
register(NotaryChangeWireTransaction::class.java, NotaryChangeWireTransactionSerializer)
|
||||
register(PartyAndCertificate::class.java, PartyAndCertificateSerializer)
|
||||
|
||||
// Don't deserialize PrivacySalt via its default constructor.
|
||||
register(PrivacySalt::class.java, PrivacySaltSerializer)
|
||||
|
||||
// Used by the remote verifier, and will possibly be removed in future.
|
||||
register(ContractAttachment::class.java, ContractAttachmentSerializer)
|
||||
|
||||
register(java.lang.invoke.SerializedLambda::class.java)
|
||||
register(ClosureSerializer.Closure::class.java, CordaClosureBlacklistSerializer)
|
||||
register(ContractUpgradeWireTransaction::class.java, ContractUpgradeWireTransactionSerializer)
|
||||
register(ContractUpgradeFilteredTransaction::class.java, ContractUpgradeFilteredTransactionSerializer)
|
||||
|
||||
for (whitelistProvider in serializationWhitelists) {
|
||||
val types = whitelistProvider.whitelist
|
||||
require(types.toSet().size == types.size) {
|
||||
val duplicates = types.toMutableList()
|
||||
types.toSet().forEach { duplicates -= it }
|
||||
"Cannot add duplicate classes to the whitelist ($duplicates)."
|
||||
}
|
||||
for (type in types) {
|
||||
((kryo.classResolver as? CordaClassResolver)?.whitelist as? MutableClassWhitelist)?.add(type)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class CustomInstantiatorStrategy : InstantiatorStrategy {
|
||||
private val fallbackStrategy = StdInstantiatorStrategy()
|
||||
// Use this to allow construction of objects using a JVM backdoor that skips invoking the constructors, if there
|
||||
// is no no-arg constructor available.
|
||||
private val defaultStrategy = Kryo.DefaultInstantiatorStrategy(fallbackStrategy)
|
||||
|
||||
override fun <T> newInstantiatorOf(type: Class<T>): ObjectInstantiator<T> {
|
||||
// However this doesn't work for non-public classes in the java. namespace
|
||||
val strat = if (type.name.startsWith("java.") && !isPublic(type.modifiers)) fallbackStrategy else defaultStrategy
|
||||
return strat.newInstantiatorOf(type)
|
||||
}
|
||||
}
|
||||
|
||||
private object PartyAndCertificateSerializer : Serializer<PartyAndCertificate>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: PartyAndCertificate) {
|
||||
kryo.writeClassAndObject(output, obj.certPath)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<PartyAndCertificate>): PartyAndCertificate {
|
||||
return PartyAndCertificate(kryo.readClassAndObject(input) as CertPath)
|
||||
}
|
||||
}
|
||||
|
||||
private object NonEmptySetSerializer : Serializer<NonEmptySet<Any>>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: NonEmptySet<Any>) {
|
||||
// Write out the contents as normal
|
||||
output.writeInt(obj.size, true)
|
||||
obj.forEach { kryo.writeClassAndObject(output, it) }
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<NonEmptySet<Any>>): NonEmptySet<Any> {
|
||||
val size = input.readInt(true)
|
||||
require(size >= 1) { "Invalid size read off the wire: $size" }
|
||||
val list = ArrayList<Any>(size)
|
||||
repeat(size) {
|
||||
list += kryo.readClassAndObject(input)
|
||||
}
|
||||
return list.toNonEmptySet()
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Avoid deserialising PrivacySalt via its default constructor
|
||||
* because the random number generator may not be available.
|
||||
*/
|
||||
private object PrivacySaltSerializer : Serializer<PrivacySalt>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: PrivacySalt) {
|
||||
output.writeBytesWithLength(obj.bytes)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<PrivacySalt>): PrivacySalt {
|
||||
return PrivacySalt(input.readBytesWithLength())
|
||||
}
|
||||
}
|
||||
|
||||
private object ContractAttachmentSerializer : Serializer<ContractAttachment>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: ContractAttachment) {
|
||||
if (kryo.serializationContext() != null) {
|
||||
obj.attachment.id.writeTo(output)
|
||||
} else {
|
||||
val buffer = ByteArrayOutputStream()
|
||||
obj.attachment.open().use { it.copyTo(buffer) }
|
||||
output.writeBytesWithLength(buffer.toByteArray())
|
||||
}
|
||||
output.writeString(obj.contract)
|
||||
kryo.writeClassAndObject(output, obj.additionalContracts)
|
||||
output.writeString(obj.uploader)
|
||||
kryo.writeClassAndObject(output, obj.signerKeys)
|
||||
output.writeInt(obj.version)
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<ContractAttachment>): ContractAttachment {
|
||||
if (kryo.serializationContext() != null) {
|
||||
val attachmentHash = SecureHash.SHA256(input.readBytes(32))
|
||||
val contract = input.readString()
|
||||
val additionalContracts = kryo.readClassAndObject(input) as Set<ContractClassName>
|
||||
val uploader = input.readString()
|
||||
val signers = kryo.readClassAndObject(input) as List<PublicKey>
|
||||
val version = input.readInt()
|
||||
val context = kryo.serializationContext()!!
|
||||
val attachmentStorage = context.serviceHub.attachments
|
||||
|
||||
val lazyAttachment = object : AbstractAttachment({
|
||||
val attachment = attachmentStorage.openAttachment(attachmentHash)
|
||||
?: throw MissingAttachmentsException(listOf(attachmentHash))
|
||||
attachment.open().readFully()
|
||||
}, uploader) {
|
||||
override val id = attachmentHash
|
||||
}
|
||||
|
||||
return ContractAttachment.create(lazyAttachment, contract, additionalContracts, uploader, signers, version)
|
||||
} else {
|
||||
val attachment = GeneratedAttachment(input.readBytesWithLength(), "generated")
|
||||
val contract = input.readString()
|
||||
val additionalContracts = kryo.readClassAndObject(input) as Set<ContractClassName>
|
||||
val uploader = input.readString()
|
||||
val signers = kryo.readClassAndObject(input) as List<PublicKey>
|
||||
val version = input.readInt()
|
||||
return ContractAttachment.create(attachment, contract, additionalContracts, uploader, signers, version)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,497 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.*
|
||||
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
|
||||
import com.esotericsoftware.kryo.serializers.FieldSerializer
|
||||
import com.esotericsoftware.kryo.util.MapReferenceResolver
|
||||
import net.corda.core.DeleteForDJVM
|
||||
import net.corda.core.contracts.PrivacySalt
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.internal.LazyMappedList
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.serialization.SerializeAsTokenContext
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.transactions.*
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.serialization.internal.serializationContextKey
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.InputStream
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.CertPath
|
||||
import java.security.cert.CertificateFactory
|
||||
import java.security.cert.X509Certificate
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.reflect.KMutableProperty
|
||||
import kotlin.reflect.KParameter
|
||||
import kotlin.reflect.full.memberProperties
|
||||
import kotlin.reflect.full.primaryConstructor
|
||||
import kotlin.reflect.jvm.isAccessible
|
||||
import kotlin.reflect.jvm.javaType
|
||||
|
||||
/**
|
||||
* Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead
|
||||
* simple, totally non-extensible binary (sub)format. Used exclusively within Corda for checkpointing flows as
|
||||
* it will happily deserialise literally anything, including malicious streams that would reconstruct classes
|
||||
* in invalid states and thus violating system invariants. In the context of checkpointing a Java stack, this is
|
||||
* absolutely the functionality we desire, for a stable binary wire format and persistence technology, we have
|
||||
* the AMQP implementation.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A serializer that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure
|
||||
* type safety hack.
|
||||
*/
|
||||
object SerializedBytesSerializer : Serializer<SerializedBytes<Any>>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: SerializedBytes<Any>) {
|
||||
output.writeVarInt(obj.size, true)
|
||||
obj.writeTo(output)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<SerializedBytes<Any>>): SerializedBytes<Any> {
|
||||
return SerializedBytes(input.readBytes(input.readVarInt(true)))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes properties and deserializes by using the constructor. This assumes that all backed properties are
|
||||
* set via the constructor and the class is immutable.
|
||||
*/
|
||||
class ImmutableClassSerializer<T : Any>(val klass: KClass<T>) : Serializer<T>() {
|
||||
val props = klass.memberProperties.sortedBy { it.name }
|
||||
val propsByName = props.associateBy { it.name }
|
||||
val constructor = klass.primaryConstructor!!
|
||||
|
||||
init {
|
||||
props.forEach {
|
||||
require(it !is KMutableProperty<*>) { "$it mutable property of class: ${klass} is unsupported" }
|
||||
}
|
||||
}
|
||||
|
||||
// Just a utility to help us catch cases where nodes are running out of sync versions.
|
||||
private fun hashParameters(params: List<KParameter>): Int {
|
||||
return params.map {
|
||||
(it.name ?: "") + it.index.toString() + it.type.javaType.typeName
|
||||
}.hashCode()
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, obj: T) {
|
||||
output.writeVarInt(constructor.parameters.size, true)
|
||||
output.writeInt(hashParameters(constructor.parameters))
|
||||
for (param in constructor.parameters) {
|
||||
val kProperty = propsByName[param.name!!]!!
|
||||
kProperty.isAccessible = true
|
||||
when (param.type.javaType.typeName) {
|
||||
"int" -> output.writeVarInt(kProperty.get(obj) as Int, true)
|
||||
"long" -> output.writeVarLong(kProperty.get(obj) as Long, true)
|
||||
"short" -> output.writeShort(kProperty.get(obj) as Int)
|
||||
"char" -> output.writeChar(kProperty.get(obj) as Char)
|
||||
"byte" -> output.writeByte(kProperty.get(obj) as Byte)
|
||||
"double" -> output.writeDouble(kProperty.get(obj) as Double)
|
||||
"float" -> output.writeFloat(kProperty.get(obj) as Float)
|
||||
"boolean" -> output.writeBoolean(kProperty.get(obj) as Boolean)
|
||||
else -> try {
|
||||
kryo.writeClassAndObject(output, kProperty.get(obj))
|
||||
} catch (e: Exception) {
|
||||
throw IllegalStateException("Failed to serialize ${param.name} in ${klass.qualifiedName}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
|
||||
require(type.kotlin == klass)
|
||||
val numFields = input.readVarInt(true)
|
||||
val fieldTypeHash = input.readInt()
|
||||
|
||||
// A few quick checks for data evolution. Note that this is not guaranteed to catch every problem! But it's
|
||||
// good enough for a prototype.
|
||||
if (numFields != constructor.parameters.size)
|
||||
throw KryoException("Mismatch between number of constructor parameters and number of serialised fields " +
|
||||
"for ${klass.qualifiedName} ($numFields vs ${constructor.parameters.size})")
|
||||
if (fieldTypeHash != hashParameters(constructor.parameters))
|
||||
throw KryoException("Hashcode mismatch for parameter types for ${klass.qualifiedName}: unsupported type evolution has happened.")
|
||||
|
||||
val args = arrayOfNulls<Any?>(numFields)
|
||||
var cursor = 0
|
||||
for (param in constructor.parameters) {
|
||||
args[cursor++] = when (param.type.javaType.typeName) {
|
||||
"int" -> input.readVarInt(true)
|
||||
"long" -> input.readVarLong(true)
|
||||
"short" -> input.readShort()
|
||||
"char" -> input.readChar()
|
||||
"byte" -> input.readByte()
|
||||
"double" -> input.readDouble()
|
||||
"float" -> input.readFloat()
|
||||
"boolean" -> input.readBoolean()
|
||||
else -> kryo.readClassAndObject(input)
|
||||
}
|
||||
}
|
||||
// If the constructor throws an exception, pass it through instead of wrapping it.
|
||||
return try {
|
||||
constructor.call(*args)
|
||||
} catch (e: InvocationTargetException) {
|
||||
throw e.cause!!
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO This is a temporary inefficient serializer for sending InputStreams through RPC. This may be done much more
|
||||
// efficiently using Artemis's large message feature.
|
||||
object InputStreamSerializer : Serializer<InputStream>() {
|
||||
override fun write(kryo: Kryo, output: Output, stream: InputStream) {
|
||||
val buffer = ByteArray(4096)
|
||||
while (true) {
|
||||
val numberOfBytesRead = stream.read(buffer)
|
||||
if (numberOfBytesRead != -1) {
|
||||
output.writeInt(numberOfBytesRead, true)
|
||||
output.writeBytes(buffer, 0, numberOfBytesRead)
|
||||
} else {
|
||||
output.writeInt(0, true)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<InputStream>): InputStream {
|
||||
val chunks = ArrayList<ByteArray>()
|
||||
while (true) {
|
||||
val chunk = input.readBytesWithLength()
|
||||
if (chunk.isEmpty()) {
|
||||
break
|
||||
} else {
|
||||
chunks.add(chunk)
|
||||
}
|
||||
}
|
||||
val flattened = ByteArray(chunks.sumBy { it.size })
|
||||
var offset = 0
|
||||
for (chunk in chunks) {
|
||||
System.arraycopy(chunk, 0, flattened, offset, chunk.size)
|
||||
offset += chunk.size
|
||||
}
|
||||
return flattened.inputStream()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
inline fun <T> Kryo.useClassLoader(cl: ClassLoader, body: () -> T): T {
|
||||
val tmp = this.classLoader ?: ClassLoader.getSystemClassLoader()
|
||||
this.classLoader = cl
|
||||
try {
|
||||
return body()
|
||||
} finally {
|
||||
this.classLoader = tmp
|
||||
}
|
||||
}
|
||||
|
||||
fun Output.writeBytesWithLength(byteArray: ByteArray) {
|
||||
this.writeInt(byteArray.size, true)
|
||||
this.writeBytes(byteArray)
|
||||
}
|
||||
|
||||
fun Input.readBytesWithLength(): ByteArray {
|
||||
val size = this.readInt(true)
|
||||
return this.readBytes(size)
|
||||
}
|
||||
|
||||
/** A serialisation engine that knows how to deserialise code inside a sandbox */
|
||||
@ThreadSafe
|
||||
object WireTransactionSerializer : Serializer<WireTransaction>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: WireTransaction) {
|
||||
kryo.writeClassAndObject(output, obj.componentGroups)
|
||||
kryo.writeClassAndObject(output, obj.privacySalt)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<WireTransaction>): WireTransaction {
|
||||
val componentGroups: List<ComponentGroup> = uncheckedCast(kryo.readClassAndObject(input))
|
||||
val privacySalt = kryo.readClassAndObject(input) as PrivacySalt
|
||||
return WireTransaction(componentGroups, privacySalt)
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
object NotaryChangeWireTransactionSerializer : Serializer<NotaryChangeWireTransaction>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: NotaryChangeWireTransaction) {
|
||||
kryo.writeClassAndObject(output, obj.serializedComponents)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<NotaryChangeWireTransaction>): NotaryChangeWireTransaction {
|
||||
val components: List<OpaqueBytes> = uncheckedCast(kryo.readClassAndObject(input))
|
||||
return NotaryChangeWireTransaction(components)
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
object ContractUpgradeWireTransactionSerializer : Serializer<ContractUpgradeWireTransaction>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: ContractUpgradeWireTransaction) {
|
||||
kryo.writeClassAndObject(output, obj.serializedComponents)
|
||||
kryo.writeClassAndObject(output, obj.privacySalt)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<ContractUpgradeWireTransaction>): ContractUpgradeWireTransaction {
|
||||
val components: List<OpaqueBytes> = uncheckedCast(kryo.readClassAndObject(input))
|
||||
val privacySalt = kryo.readClassAndObject(input) as PrivacySalt
|
||||
|
||||
return ContractUpgradeWireTransaction(components, privacySalt)
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
object ContractUpgradeFilteredTransactionSerializer : Serializer<ContractUpgradeFilteredTransaction>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: ContractUpgradeFilteredTransaction) {
|
||||
kryo.writeClassAndObject(output, obj.visibleComponents)
|
||||
kryo.writeClassAndObject(output, obj.hiddenComponents)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<ContractUpgradeFilteredTransaction>): ContractUpgradeFilteredTransaction {
|
||||
val visibleComponents: Map<Int, ContractUpgradeFilteredTransaction.FilteredComponent> = uncheckedCast(kryo.readClassAndObject(input))
|
||||
val hiddenComponents: Map<Int, SecureHash> = uncheckedCast(kryo.readClassAndObject(input))
|
||||
return ContractUpgradeFilteredTransaction(visibleComponents, hiddenComponents)
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
object SignedTransactionSerializer : Serializer<SignedTransaction>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: SignedTransaction) {
|
||||
kryo.writeClassAndObject(output, obj.txBits)
|
||||
kryo.writeClassAndObject(output, obj.sigs)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<SignedTransaction>): SignedTransaction {
|
||||
return SignedTransaction(
|
||||
uncheckedCast<Any?, SerializedBytes<CoreTransaction>>(kryo.readClassAndObject(input)),
|
||||
uncheckedCast<Any?, List<TransactionSignature>>(kryo.readClassAndObject(input))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
object PrivateKeySerializer : Serializer<PrivateKey>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: PrivateKey) {
|
||||
output.writeBytesWithLength(obj.encoded)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<PrivateKey>): PrivateKey {
|
||||
val A = input.readBytesWithLength()
|
||||
return Crypto.decodePrivateKey(A)
|
||||
}
|
||||
}
|
||||
|
||||
/** For serialising a public key */
|
||||
@ThreadSafe
|
||||
object PublicKeySerializer : Serializer<PublicKey>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: PublicKey) {
|
||||
// TODO: Instead of encoding to the default X509 format, we could have a custom per key type (space-efficient) serialiser.
|
||||
output.writeBytesWithLength(obj.encoded)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<PublicKey>): PublicKey {
|
||||
val A = input.readBytesWithLength()
|
||||
return Crypto.decodePublicKey(A)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for reading lists with number of elements at the beginning.
|
||||
* @param minLen minimum number of elements we expect for list to include, defaults to 1
|
||||
* @param expectedLen expected length of the list, defaults to null if arbitrary length list read
|
||||
*/
|
||||
inline fun <reified T> readListOfLength(kryo: Kryo, input: Input, minLen: Int = 1, expectedLen: Int? = null): List<T> {
|
||||
val elemCount = input.readInt()
|
||||
if (elemCount < minLen) throw KryoException("Cannot deserialize list, too little elements. Minimum required: $minLen, got: $elemCount")
|
||||
if (expectedLen != null && elemCount != expectedLen)
|
||||
throw KryoException("Cannot deserialize list, expected length: $expectedLen, got: $elemCount.")
|
||||
return (1..elemCount).map { kryo.readClassAndObject(input) as T }
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to disable whitelist checking during calls from our Kryo code to register a serializer, since it checks
|
||||
* for existing registrations and then will enter our [CordaClassResolver.getRegistration] method.
|
||||
*/
|
||||
open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapReferenceResolver()) {
|
||||
override fun register(type: Class<*>?): Registration {
|
||||
(classResolver as? CordaClassResolver)?.disableWhitelist()
|
||||
try {
|
||||
return super.register(type)
|
||||
} finally {
|
||||
(classResolver as? CordaClassResolver)?.enableWhitelist()
|
||||
}
|
||||
}
|
||||
|
||||
override fun register(type: Class<*>?, id: Int): Registration {
|
||||
(classResolver as? CordaClassResolver)?.disableWhitelist()
|
||||
try {
|
||||
return super.register(type, id)
|
||||
} finally {
|
||||
(classResolver as? CordaClassResolver)?.enableWhitelist()
|
||||
}
|
||||
}
|
||||
|
||||
override fun register(type: Class<*>?, serializer: Serializer<*>?): Registration {
|
||||
(classResolver as? CordaClassResolver)?.disableWhitelist()
|
||||
try {
|
||||
return super.register(type, serializer)
|
||||
} finally {
|
||||
(classResolver as? CordaClassResolver)?.enableWhitelist()
|
||||
}
|
||||
}
|
||||
|
||||
override fun register(registration: Registration?): Registration {
|
||||
(classResolver as? CordaClassResolver)?.disableWhitelist()
|
||||
try {
|
||||
return super.register(registration)
|
||||
} finally {
|
||||
(classResolver as? CordaClassResolver)?.enableWhitelist()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline fun <T : Any> Kryo.register(
|
||||
type: KClass<T>,
|
||||
crossinline read: (Kryo, Input) -> T,
|
||||
crossinline write: (Kryo, Output, T) -> Unit): Registration {
|
||||
return register(
|
||||
type.java,
|
||||
object : Serializer<T>() {
|
||||
override fun read(kryo: Kryo, input: Input, clazz: Class<T>): T = read(kryo, input)
|
||||
override fun write(kryo: Kryo, output: Output, obj: T) = write(kryo, output, obj)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method to mark any types which can have the same instance within it more than once. This will make sure
|
||||
* the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic
|
||||
* references will throw a stack overflow exception during serialisation.
|
||||
*/
|
||||
inline fun <reified T : Any> Kryo.noReferencesWithin() {
|
||||
register(T::class.java, NoReferencesSerializer(getSerializer(T::class.java)))
|
||||
}
|
||||
|
||||
class NoReferencesSerializer<T>(private val baseSerializer: Serializer<T>) : Serializer<T>() {
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
|
||||
return kryo.withoutReferences { baseSerializer.read(kryo, input, type) }
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, obj: T) {
|
||||
kryo.withoutReferences { baseSerializer.write(kryo, output, obj) }
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> Kryo.withoutReferences(block: () -> T): T {
|
||||
val previousValue = setReferences(false)
|
||||
try {
|
||||
return block()
|
||||
} finally {
|
||||
references = previousValue
|
||||
}
|
||||
}
|
||||
|
||||
/** For serialising a Logger. */
|
||||
@ThreadSafe
|
||||
object LoggerSerializer : Serializer<Logger>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: Logger) {
|
||||
output.writeString(obj.name)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Logger>): Logger {
|
||||
return LoggerFactory.getLogger(input.readString())
|
||||
}
|
||||
}
|
||||
|
||||
object ClassSerializer : Serializer<Class<*>>() {
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Class<*>>): Class<*> {
|
||||
val className = input.readString()
|
||||
return Class.forName(className, true, kryo.classLoader)
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, clazz: Class<*>) {
|
||||
output.writeString(clazz.name)
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
object CertPathSerializer : Serializer<CertPath>() {
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<CertPath>): CertPath {
|
||||
val factory = CertificateFactory.getInstance(input.readString())
|
||||
return factory.generateCertPath(input.readBytesWithLength().inputStream())
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, obj: CertPath) {
|
||||
output.writeString(obj.type)
|
||||
output.writeBytesWithLength(obj.encoded)
|
||||
}
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
object X509CertificateSerializer : Serializer<X509Certificate>() {
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<X509Certificate>): X509Certificate {
|
||||
return CertificateFactory.getInstance("X.509").generateCertificate(input.readBytesWithLength().inputStream()) as X509Certificate
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, obj: X509Certificate) {
|
||||
output.writeBytesWithLength(obj.encoded)
|
||||
}
|
||||
}
|
||||
|
||||
fun Kryo.serializationContext(): SerializeAsTokenContext? = context.get(serializationContextKey) as? SerializeAsTokenContext
|
||||
|
||||
/**
|
||||
* For serializing instances if [Throwable] honoring the fact that [java.lang.Throwable.suppressedExceptions]
|
||||
* might be un-initialized/empty.
|
||||
* In the absence of this class [CompatibleFieldSerializer] will be used which will assign a *new* instance of
|
||||
* unmodifiable collection to [java.lang.Throwable.suppressedExceptions] which will fail some sentinel identity checks
|
||||
* e.g. in [java.lang.Throwable.addSuppressed]
|
||||
*/
|
||||
@DeleteForDJVM
|
||||
@ThreadSafe
|
||||
class ThrowableSerializer<T>(kryo: Kryo, type: Class<T>) : Serializer<Throwable>(false, true) {
|
||||
|
||||
private companion object {
|
||||
private val suppressedField = Throwable::class.java.getDeclaredField("suppressedExceptions")
|
||||
|
||||
private val sentinelValue = let {
|
||||
val sentinelField = Throwable::class.java.getDeclaredField("SUPPRESSED_SENTINEL")
|
||||
sentinelField.isAccessible = true
|
||||
sentinelField.get(null)
|
||||
}
|
||||
|
||||
init {
|
||||
suppressedField.isAccessible = true
|
||||
}
|
||||
}
|
||||
|
||||
private val delegate: Serializer<Throwable> = uncheckedCast(ReflectionSerializerFactory.makeSerializer(kryo, FieldSerializer::class.java, type))
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, throwable: Throwable) {
|
||||
delegate.write(kryo, output, throwable)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Throwable>): Throwable {
|
||||
val throwableRead = delegate.read(kryo, input, type)
|
||||
if (throwableRead.suppressed.isEmpty()) {
|
||||
throwableRead.setSuppressedToSentinel()
|
||||
}
|
||||
return throwableRead
|
||||
}
|
||||
|
||||
private fun Throwable.setSuppressedToSentinel() = suppressedField.set(this, sentinelValue)
|
||||
}
|
||||
|
||||
/** For serializing the utility [LazyMappedList]. It will serialize the fully resolved object.*/
|
||||
@ThreadSafe
|
||||
@SuppressWarnings("ALL")
|
||||
object LazyMappedListSerializer : Serializer<List<*>>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: List<*>) = kryo.writeClassAndObject(output, obj.toList())
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<List<*>>) = kryo.readClassAndObject(input) as List<*>
|
||||
}
|
@ -0,0 +1,123 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import com.esotericsoftware.kryo.serializers.ClosureSerializer
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||
import net.corda.core.serialization.internal.CheckpointSerializer
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.serialization.internal.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
val kryoMagic = CordaSerializationMagic("corda".toByteArray() + byteArrayOf(0, 0))
|
||||
|
||||
private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>() {
|
||||
override fun write(kryo: Kryo, output: Output, closeable: AutoCloseable) {
|
||||
val message = "${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " +
|
||||
"Restoring such resources across node restarts is not supported. Make sure code accessing it is " +
|
||||
"confined to a private method or the reference is nulled out."
|
||||
throw UnsupportedOperationException(message)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<AutoCloseable>) = throw IllegalStateException("Should not reach here!")
|
||||
}
|
||||
|
||||
object KryoCheckpointSerializer : CheckpointSerializer {
|
||||
private val kryoPoolsForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, KryoPool>()
|
||||
|
||||
private fun getPool(context: CheckpointSerializationContext): KryoPool {
|
||||
return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
||||
KryoPool.Builder {
|
||||
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
|
||||
val classResolver = CordaClassResolver(context).apply { setKryo(serializer.kryo) }
|
||||
// TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that
|
||||
val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true }
|
||||
serializer.kryo.apply {
|
||||
field.set(this, classResolver)
|
||||
// don't allow overriding the public key serializer for checkpointing
|
||||
DefaultKryoCustomizer.customize(this)
|
||||
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
|
||||
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
|
||||
classLoader = it.second
|
||||
}
|
||||
}.build()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T : Any> CheckpointSerializationContext.kryo(task: Kryo.() -> T): T {
|
||||
return getPool(this).run { kryo ->
|
||||
kryo.context.ensureCapacity(properties.size)
|
||||
properties.forEach { kryo.context.put(it.key, it.value) }
|
||||
try {
|
||||
kryo.task()
|
||||
} finally {
|
||||
kryo.context.clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun <T : Any> deserialize(byteSequence: ByteSequence, clazz: Class<T>, context: CheckpointSerializationContext): T {
|
||||
val dataBytes = kryoMagic.consume(byteSequence)
|
||||
?: throw KryoException("Serialized bytes header does not match expected format.")
|
||||
return context.kryo {
|
||||
kryoInput(ByteBufferInputStream(dataBytes)) {
|
||||
val result: T
|
||||
loop@ while (true) {
|
||||
when (SectionId.reader.readFrom(this)) {
|
||||
SectionId.ENCODING -> {
|
||||
val encoding = CordaSerializationEncoding.reader.readFrom(this)
|
||||
context.encodingWhitelist.acceptEncoding(encoding) || throw KryoException(encodingNotPermittedFormat.format(encoding))
|
||||
substitute(encoding::wrap)
|
||||
}
|
||||
SectionId.DATA_AND_STOP, SectionId.ALT_DATA_AND_STOP -> {
|
||||
result = if (context.objectReferencesEnabled) {
|
||||
uncheckedCast(readClassAndObject(this))
|
||||
} else {
|
||||
withoutReferences { uncheckedCast<Any?, T>(readClassAndObject(this)) }
|
||||
}
|
||||
break@loop
|
||||
}
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun <T : Any> serialize(obj: T, context: CheckpointSerializationContext): SerializedBytes<T> {
|
||||
return context.kryo {
|
||||
SerializedBytes(kryoOutput {
|
||||
kryoMagic.writeTo(this)
|
||||
context.encoding?.let { encoding ->
|
||||
SectionId.ENCODING.writeTo(this)
|
||||
(encoding as CordaSerializationEncoding).writeTo(this)
|
||||
substitute(encoding::wrap)
|
||||
}
|
||||
SectionId.ALT_DATA_AND_STOP.writeTo(this) // Forward-compatible in null-encoding case.
|
||||
if (context.objectReferencesEnabled) {
|
||||
writeClassAndObject(this, obj)
|
||||
} else {
|
||||
withoutReferences { writeClassAndObject(this, obj) }
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val KRYO_CHECKPOINT_CONTEXT = CheckpointSerializationContextImpl(
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
QuasarWhitelist,
|
||||
emptyMap(),
|
||||
true,
|
||||
null,
|
||||
AlwaysAcceptEncodingWhitelist
|
||||
)
|
@ -0,0 +1,43 @@
|
||||
@file:JvmName("KryoStreams")
|
||||
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import net.corda.core.internal.LazyPool
|
||||
import net.corda.serialization.internal.byteArrayOutput
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
import java.io.SequenceInputStream
|
||||
|
||||
private val serializationBufferPool = LazyPool(
|
||||
newInstance = { ByteArray(64 * 1024) })
|
||||
|
||||
internal fun <T> kryoInput(underlying: InputStream, task: Input.() -> T): T {
|
||||
return serializationBufferPool.run {
|
||||
Input(it).use { input ->
|
||||
input.inputStream = underlying
|
||||
input.task()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal fun <T> kryoOutput(task: Output.() -> T): ByteArray {
|
||||
return byteArrayOutput { underlying ->
|
||||
serializationBufferPool.run {
|
||||
Output(it).use { output ->
|
||||
output.outputStream = underlying
|
||||
output.task()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal fun Output.substitute(transform: (OutputStream) -> OutputStream) {
|
||||
flush()
|
||||
outputStream = transform(outputStream)
|
||||
}
|
||||
|
||||
internal fun Input.substitute(transform: (InputStream) -> InputStream) {
|
||||
inputStream = transform(SequenceInputStream(buffer.copyOfRange(position(), limit()).inputStream(), inputStream))
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import net.corda.core.internal.castIfPossible
|
||||
import net.corda.core.serialization.SerializationToken
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
|
||||
/**
|
||||
* A Kryo serializer for [SerializeAsToken] implementations.
|
||||
*/
|
||||
class SerializeAsTokenSerializer<T : SerializeAsToken> : Serializer<T>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: T) {
|
||||
kryo.writeClassAndObject(output, obj.toToken(kryo.serializationContext()
|
||||
?: throw KryoException("Attempt to write a ${SerializeAsToken::class.simpleName} instance of ${obj.javaClass.name} without initialising a context")))
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
|
||||
val token = (kryo.readClassAndObject(input) as? SerializationToken)
|
||||
?: throw KryoException("Non-token read for tokenized type: ${type.name}")
|
||||
val fromToken = token.fromToken(kryo.serializationContext()
|
||||
?: throw KryoException("Attempt to read a token for a ${SerializeAsToken::class.simpleName} instance of ${type.name} without initialising a context"))
|
||||
return type.castIfPossible(fromToken)
|
||||
?: throw KryoException("Token read ($token) did not return expected tokenized type: ${type.name}")
|
||||
}
|
||||
}
|
@ -25,7 +25,7 @@ import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.MockCordappConfigProvider
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.coretesting.internal.rigorousMock
|
||||
import net.corda.testing.node.internal.cordappWithPackages
|
||||
import net.corda.testing.services.MockAttachmentStorage
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
|
@ -11,8 +11,8 @@ import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.internal.TestNodeInfoBuilder
|
||||
import net.corda.testing.internal.signWith
|
||||
import net.corda.coretesting.internal.TestNodeInfoBuilder
|
||||
import net.corda.coretesting.internal.signWith
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Rule
|
||||
|
@ -1,7 +1,7 @@
|
||||
package net.corda.nodeapi.internal.crypto
|
||||
|
||||
import net.corda.core.crypto.internal.AliasPrivateKey
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
|
@ -21,7 +21,7 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.hours
|
||||
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.createDevNodeCa
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities.DEFAULT_IDENTITY_SIGNATURE_SCHEME
|
||||
@ -37,11 +37,11 @@ import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.testing.internal.NettyTestClient
|
||||
import net.corda.testing.internal.NettyTestHandler
|
||||
import net.corda.testing.internal.NettyTestServer
|
||||
import net.corda.coretesting.internal.NettyTestClient
|
||||
import net.corda.coretesting.internal.NettyTestHandler
|
||||
import net.corda.coretesting.internal.NettyTestServer
|
||||
import net.corda.testing.internal.createDevIntermediateCaCertPath
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.i2p.crypto.eddsa.EdDSAPrivateKey
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.bouncycastle.asn1.x509.*
|
||||
|
@ -12,7 +12,7 @@ import net.corda.nodeapi.internal.cryptoservice.CryptoServiceException
|
||||
import net.corda.nodeapi.internal.cryptoservice.WrappedPrivateKey
|
||||
import net.corda.nodeapi.internal.cryptoservice.WrappingMode
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
|
@ -18,8 +18,12 @@ import net.corda.nodeapi.internal.config.toConfig
|
||||
import net.corda.nodeapi.internal.network.NetworkBootstrapper.Companion.DEFAULT_MAX_MESSAGE_SIZE
|
||||
import net.corda.nodeapi.internal.network.NetworkBootstrapper.Companion.DEFAULT_MAX_TRANSACTION_SIZE
|
||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.internal.createNodeInfoAndSigned
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.coretesting.internal.createNodeInfoAndSigned
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
|
@ -0,0 +1,99 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
|
||||
import co.paralleluniverse.common.util.SameThreadExecutor
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.toSynchronised
|
||||
import net.corda.nodeapi.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.testutils.serializationContext
|
||||
import net.corda.nodeapi.internal.serialization.amqp.RpcServerObservableSerializer
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.rpc.ObservableSubscription
|
||||
import net.corda.serialization.internal.amqp.*
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.junit.Test
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import net.corda.nodeapi.internal.rpc.client.ObservableContext as ClientObservableContext
|
||||
import net.corda.nodeapi.internal.serialization.testutils.TestObservableContext as ServerObservableContext
|
||||
|
||||
class RoundTripObservableSerializerTests {
|
||||
private fun getID() = Trace.InvocationId("test1", Instant.now())
|
||||
|
||||
private fun subscriptionMap(
|
||||
id: Trace.InvocationId
|
||||
) : Cache<Trace.InvocationId, ObservableSubscription> {
|
||||
val subMap: Cache<Trace.InvocationId, ObservableSubscription> = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
|
||||
.maximumSize(100)
|
||||
.build()
|
||||
|
||||
subMap.put(id, ObservableSubscription(mock()))
|
||||
|
||||
return subMap
|
||||
}
|
||||
|
||||
private val observablesToReap = ThreadBox(object {
|
||||
var observables = ArrayList<Trace.InvocationId>()
|
||||
})
|
||||
|
||||
private fun createRpcObservableMap(): Cache<Trace.InvocationId, UnicastSubject<Notification<*>>> {
|
||||
val onObservableRemove = RemovalListener<Trace.InvocationId, UnicastSubject<Notification<*>>> { key, _, _ ->
|
||||
val observableId = key!!
|
||||
|
||||
observablesToReap.locked { observables.add(observableId) }
|
||||
}
|
||||
|
||||
return Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build()
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun roundTripTest1() {
|
||||
val serializationScheme = AMQPRoundTripRPCSerializationScheme(
|
||||
serializationContext, emptySet(), emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
|
||||
|
||||
// Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap,
|
||||
// the client as a property of the deserializer which, in the actual RPC client, is pulled off of
|
||||
// the received message
|
||||
val id : Trace.InvocationId = getID()
|
||||
|
||||
val serverObservableContext = ServerObservableContext(
|
||||
subscriptionMap(id),
|
||||
clientAddressToObservables = ConcurrentHashMap(),
|
||||
deduplicationIdentity = "thisIsATest",
|
||||
clientAddress = SimpleString("clientAddress"))
|
||||
|
||||
val serverSerializer = serializationScheme.rpcServerSerializerFactory(serverObservableContext)
|
||||
|
||||
val clientObservableContext = ClientObservableContext(
|
||||
callSiteMap = null,
|
||||
observableMap = createRpcObservableMap(),
|
||||
hardReferenceStore = Collections.synchronizedSet(mutableSetOf<Observable<*>>())
|
||||
)
|
||||
|
||||
val clientSerializer = serializationScheme.rpcClientSerializerFactory(clientObservableContext, id)
|
||||
|
||||
|
||||
// What we're actually going to serialize then deserialize
|
||||
val obs = Observable.unsafeCreate<Int> { Math.random() }
|
||||
|
||||
val serverSerializationContext = RpcServerObservableSerializer.createContext(
|
||||
serializationContext, serverObservableContext)
|
||||
|
||||
val clientSerializationContext = RpcClientObservableDeSerializer.createContext(
|
||||
serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id)
|
||||
|
||||
|
||||
val blob = SerializationOutput(serverSerializer).serialize(obs, serverSerializationContext)
|
||||
DeserializationInput(clientSerializer).deserialize(blob, clientSerializationContext)
|
||||
}
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.nodeapi.internal.serialization.testutils.TestObservableContext
|
||||
import net.corda.nodeapi.internal.serialization.testutils.serializationContext
|
||||
import net.corda.nodeapi.internal.rpc.ObservableSubscription
|
||||
import net.corda.nodeapi.internal.serialization.amqp.RpcServerObservableSerializer
|
||||
import net.corda.serialization.internal.AllWhitelist
|
||||
import net.corda.serialization.internal.amqp.SerializationOutput
|
||||
import net.corda.serialization.internal.amqp.SerializerFactoryBuilder
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class RpcServerObservableSerializerTests {
|
||||
|
||||
private fun subscriptionMap(): Cache<Trace.InvocationId, ObservableSubscription> {
|
||||
val subMap: Cache<Trace.InvocationId, ObservableSubscription> = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
|
||||
.maximumSize(100)
|
||||
.build()
|
||||
|
||||
subMap.put(Trace.InvocationId("test1", Instant.now()), ObservableSubscription(mock()))
|
||||
|
||||
return subMap
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun canSerializerBeRegistered() {
|
||||
val sf = SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader)
|
||||
|
||||
try {
|
||||
sf.register(RpcServerObservableSerializer())
|
||||
} catch (e: Exception) {
|
||||
throw Error("Observable serializer must be registerable with factory, unexpected exception - ${e.message}")
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun canAssociateWithContext() {
|
||||
val observable = TestObservableContext(
|
||||
subscriptionMap(),
|
||||
clientAddressToObservables = ConcurrentHashMap(),
|
||||
deduplicationIdentity = "thisIsATest",
|
||||
clientAddress = SimpleString("clientAddress"))
|
||||
|
||||
val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable)
|
||||
|
||||
assertEquals(1, newContext.properties.size)
|
||||
assertTrue(newContext.properties.containsKey(RpcServerObservableSerializer.RpcObservableContextKey))
|
||||
assertEquals(observable, newContext.properties[RpcServerObservableSerializer.RpcObservableContextKey])
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun serialiseFakeObservable() {
|
||||
val testClientAddress = "clientAddres"
|
||||
val observable = TestObservableContext(
|
||||
subscriptionMap(),
|
||||
clientAddressToObservables = ConcurrentHashMap(),
|
||||
deduplicationIdentity = "thisIsATest",
|
||||
clientAddress = SimpleString(testClientAddress))
|
||||
|
||||
val sf = SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader).apply {
|
||||
register(RpcServerObservableSerializer())
|
||||
}
|
||||
|
||||
val obs = Observable.unsafeCreate<Int> { Math.random() }
|
||||
val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable)
|
||||
|
||||
try {
|
||||
SerializationOutput(sf).serializeAndReturnSchema(obs, newContext)
|
||||
} catch (e: Exception) {
|
||||
throw Error("Serialization of observable should not throw - ${e.message}")
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,98 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.serialization.internal.ByteBufferOutputStream
|
||||
import org.assertj.core.api.Assertions.catchThrowable
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Test
|
||||
import java.io.*
|
||||
import java.nio.BufferOverflowException
|
||||
import java.util.*
|
||||
import java.util.zip.DeflaterOutputStream
|
||||
import java.util.zip.InflaterInputStream
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertSame
|
||||
|
||||
class KryoStreamsTest {
|
||||
class NegOutputStream(private val stream: OutputStream) : OutputStream() {
|
||||
override fun write(b: Int) = stream.write(-b)
|
||||
}
|
||||
|
||||
class NegInputStream(private val stream: InputStream) : InputStream() {
|
||||
override fun read() = stream.read().let {
|
||||
if (it != -1) 0xff and -it else -1
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `substitute output works`() {
|
||||
assertArrayEquals(byteArrayOf(100, -101), kryoOutput {
|
||||
write(100)
|
||||
substitute(KryoStreamsTest::NegOutputStream)
|
||||
write(101)
|
||||
})
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `substitute input works`() {
|
||||
kryoInput(byteArrayOf(100, 101).inputStream()) {
|
||||
assertEquals(100, read())
|
||||
substitute(KryoStreamsTest::NegInputStream)
|
||||
assertEquals(-101, read().toByte())
|
||||
assertEquals(-1, read())
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `zip round-trip`() {
|
||||
val data = ByteArray(12345).also { Random(0).nextBytes(it) }
|
||||
val encoded = kryoOutput {
|
||||
write(data)
|
||||
substitute(::DeflaterOutputStream)
|
||||
write(data)
|
||||
substitute(::DeflaterOutputStream) // Potentially useful if a different codec.
|
||||
write(data)
|
||||
}
|
||||
kryoInput(encoded.inputStream()) {
|
||||
assertArrayEquals(data, readBytes(data.size))
|
||||
substitute(::InflaterInputStream)
|
||||
assertArrayEquals(data, readBytes(data.size))
|
||||
substitute(::InflaterInputStream)
|
||||
assertArrayEquals(data, readBytes(data.size))
|
||||
assertEquals(-1, read())
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `ByteBufferOutputStream works`() {
|
||||
val stream = ByteBufferOutputStream(3)
|
||||
stream.write("abc".toByteArray())
|
||||
val getBuf = stream.declaredField<ByteArray>(ByteArrayOutputStream::class, "buf")::value
|
||||
assertEquals(3, getBuf().size)
|
||||
repeat(2) {
|
||||
assertSame<Any>(BufferOverflowException::class.java, catchThrowable {
|
||||
stream.alsoAsByteBuffer(9) {
|
||||
it.put("0123456789".toByteArray())
|
||||
}
|
||||
}.javaClass)
|
||||
assertEquals(3 + 9, getBuf().size)
|
||||
}
|
||||
// This time make too much space:
|
||||
stream.alsoAsByteBuffer(11) {
|
||||
it.put("0123456789".toByteArray())
|
||||
}
|
||||
stream.write("def".toByteArray())
|
||||
assertArrayEquals("abc0123456789def".toByteArray(), stream.toByteArray())
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `ByteBufferOutputStream discards data after final position`() {
|
||||
val stream = ByteBufferOutputStream(0)
|
||||
stream.alsoAsByteBuffer(10) {
|
||||
it.put("0123456789".toByteArray())
|
||||
it.position(5)
|
||||
}
|
||||
stream.write("def".toByteArray())
|
||||
assertArrayEquals("01234def".toByteArray(), stream.toByteArray())
|
||||
}
|
||||
}
|
@ -0,0 +1,364 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.KryoSerializable
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.google.common.primitives.Ints
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.contracts.PrivacySalt
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||
import net.corda.core.serialization.internal.checkpointDeserialize
|
||||
import net.corda.core.serialization.internal.checkpointSerialize
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.serialization.internal.*
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule
|
||||
import net.corda.coretesting.internal.rigorousMock
|
||||
import org.apache.commons.lang3.SystemUtils
|
||||
import org.assertj.core.api.Assertions.*
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized
|
||||
import org.junit.runners.Parameterized.Parameters
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.InputStream
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.collections.ArrayList
|
||||
import kotlin.test.*
|
||||
|
||||
@RunWith(Parameterized::class)
|
||||
class KryoTests(private val compression: CordaSerializationEncoding?) {
|
||||
companion object {
|
||||
private val ALICE_PUBKEY = TestIdentity(ALICE_NAME, 70).publicKey
|
||||
@Parameters(name = "{0}")
|
||||
@JvmStatic
|
||||
fun compression() = arrayOf<CordaSerializationEncoding?>(null) + CordaSerializationEncoding.values()
|
||||
}
|
||||
|
||||
@get:Rule
|
||||
val serializationRule = CheckpointSerializationEnvironmentRule()
|
||||
private lateinit var context: CheckpointSerializationContext
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
context = CheckpointSerializationContextImpl(
|
||||
javaClass.classLoader,
|
||||
AllWhitelist,
|
||||
emptyMap(),
|
||||
true,
|
||||
compression,
|
||||
rigorousMock<EncodingWhitelist>().also {
|
||||
if (compression != null) doReturn(true).whenever(it).acceptEncoding(compression)
|
||||
})
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `simple data class`() {
|
||||
val birthday = Instant.parse("1984-04-17T00:30:00.00Z")
|
||||
val mike = Person("mike", birthday)
|
||||
val bits = mike.checkpointSerialize(context)
|
||||
assertThat(bits.checkpointDeserialize(context)).isEqualTo(Person("mike", birthday))
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `null values`() {
|
||||
val bob = Person("bob", null)
|
||||
val bits = bob.checkpointSerialize(context)
|
||||
assertThat(bits.checkpointDeserialize(context)).isEqualTo(Person("bob", null))
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialised form is stable when the same object instance is added to the deserialised object graph`() {
|
||||
val noReferencesContext = context.withoutReferences()
|
||||
val obj : ByteSequence = Ints.toByteArray(0x01234567).sequence()
|
||||
val originalList : ArrayList<ByteSequence> = ArrayList<ByteSequence>().apply { this += obj }
|
||||
val deserialisedList = originalList.checkpointSerialize(noReferencesContext).checkpointDeserialize(noReferencesContext)
|
||||
originalList += obj
|
||||
deserialisedList += obj
|
||||
assertThat(deserialisedList.checkpointSerialize(noReferencesContext)).isEqualTo(originalList.checkpointSerialize(noReferencesContext))
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialised form is stable when the same object instance occurs more than once, and using java serialisation`() {
|
||||
val noReferencesContext = context.withoutReferences()
|
||||
val instant = Instant.ofEpochMilli(123)
|
||||
val instantCopy = Instant.ofEpochMilli(123)
|
||||
assertThat(instant).isNotSameAs(instantCopy)
|
||||
val listWithCopies = ArrayList<Instant>().apply {
|
||||
this += instant
|
||||
this += instantCopy
|
||||
}
|
||||
val listWithSameInstances = ArrayList<Instant>().apply {
|
||||
this += instant
|
||||
this += instant
|
||||
}
|
||||
assertThat(listWithSameInstances.checkpointSerialize(noReferencesContext)).isEqualTo(listWithCopies.checkpointSerialize(noReferencesContext))
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `cyclic object graph`() {
|
||||
val cyclic = Cyclic(3)
|
||||
val bits = cyclic.checkpointSerialize(context)
|
||||
assertThat(bits.checkpointDeserialize(context)).isEqualTo(cyclic)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `deserialised key pair functions the same as serialised one`() {
|
||||
val keyPair = generateKeyPair()
|
||||
val bitsToSign: ByteArray = Ints.toByteArray(0x01234567)
|
||||
val wrongBits: ByteArray = Ints.toByteArray(0x76543210)
|
||||
val signature = keyPair.sign(bitsToSign)
|
||||
signature.verify(bitsToSign)
|
||||
assertThatThrownBy { signature.verify(wrongBits) }
|
||||
|
||||
val deserialisedKeyPair = keyPair.checkpointSerialize(context).checkpointDeserialize(context)
|
||||
val deserialisedSignature = deserialisedKeyPair.sign(bitsToSign)
|
||||
deserialisedSignature.verify(bitsToSign)
|
||||
assertThatThrownBy { deserialisedSignature.verify(wrongBits) }
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `write and read Kotlin object singleton`() {
|
||||
val serialised = TestSingleton.checkpointSerialize(context)
|
||||
val deserialised = serialised.checkpointDeserialize(context)
|
||||
assertThat(deserialised).isSameAs(TestSingleton)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `check Kotlin EmptyList can be serialised`() {
|
||||
val deserialisedList: List<Int> = emptyList<Int>().checkpointSerialize(context).checkpointDeserialize(context)
|
||||
assertEquals(0, deserialisedList.size)
|
||||
assertEquals<Any>(Collections.emptyList<Int>().javaClass, deserialisedList.javaClass)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `check Kotlin EmptySet can be serialised`() {
|
||||
val deserialisedSet: Set<Int> = emptySet<Int>().checkpointSerialize(context).checkpointDeserialize(context)
|
||||
assertEquals(0, deserialisedSet.size)
|
||||
assertEquals<Any>(Collections.emptySet<Int>().javaClass, deserialisedSet.javaClass)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `check Kotlin EmptyMap can be serialised`() {
|
||||
val deserialisedMap: Map<Int, Int> = emptyMap<Int, Int>().checkpointSerialize(context).checkpointDeserialize(context)
|
||||
assertEquals(0, deserialisedMap.size)
|
||||
assertEquals<Any>(Collections.emptyMap<Int, Int>().javaClass, deserialisedMap.javaClass)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `InputStream serialisation`() {
|
||||
val rubbish = ByteArray(12345) { (it * it * 0.12345).toByte() }
|
||||
val readRubbishStream: InputStream = rubbish.inputStream().checkpointSerialize(context).checkpointDeserialize(context)
|
||||
for (i in 0..12344) {
|
||||
assertEquals(rubbish[i], readRubbishStream.read().toByte())
|
||||
}
|
||||
assertEquals(-1, readRubbishStream.read())
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `InputStream serialisation does not write trailing garbage`() {
|
||||
val byteArrays = listOf("123", "456").map { it.toByteArray() }
|
||||
val streams = byteArrays.map { it.inputStream() }.checkpointSerialize(context).checkpointDeserialize(context).iterator()
|
||||
byteArrays.forEach { assertArrayEquals(it, streams.next().readBytes()) }
|
||||
assertFalse(streams.hasNext())
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialize - deserialize SignableData`() {
|
||||
val testString = "Hello World"
|
||||
val testBytes = testString.toByteArray()
|
||||
|
||||
val meta = SignableData(testBytes.sha256(), SignatureMetadata(1, Crypto.findSignatureScheme(ALICE_PUBKEY).schemeNumberID))
|
||||
val serializedMetaData = meta.checkpointSerialize(context).bytes
|
||||
val meta2 = serializedMetaData.checkpointDeserialize<SignableData>(context)
|
||||
assertEquals(meta2, meta)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialize - deserialize Logger`() {
|
||||
val storageContext: CheckpointSerializationContext = context
|
||||
val logger = LoggerFactory.getLogger("aName")
|
||||
val logger2 = logger.checkpointSerialize(storageContext).checkpointDeserialize(storageContext)
|
||||
assertEquals(logger.name, logger2.name)
|
||||
assertTrue(logger === logger2)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `HashCheckingStream (de)serialize`() {
|
||||
val rubbish = ByteArray(12345) { (it * it * 0.12345).toByte() }
|
||||
val readRubbishStream: InputStream = NodeAttachmentService.HashCheckingStream(
|
||||
SecureHash.sha256(rubbish),
|
||||
rubbish.size,
|
||||
rubbish.inputStream()
|
||||
).checkpointSerialize(context).checkpointDeserialize(context)
|
||||
for (i in 0..12344) {
|
||||
assertEquals(rubbish[i], readRubbishStream.read().toByte())
|
||||
}
|
||||
assertEquals(-1, readRubbishStream.read())
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
private data class Person(val name: String, val birthday: Instant?)
|
||||
|
||||
@Suppress("unused")
|
||||
@CordaSerializable
|
||||
private class Cyclic(val value: Int) {
|
||||
val thisInstance = this
|
||||
override fun equals(other: Any?): Boolean = (this === other) || (other is Cyclic && this.value == other.value)
|
||||
override fun hashCode(): Int = value.hashCode()
|
||||
override fun toString(): String = "Cyclic($value)"
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialize - deserialize PrivacySalt`() {
|
||||
val expected = PrivacySalt(byteArrayOf(
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
|
||||
11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
|
||||
21, 22, 23, 24, 25, 26, 27, 28, 29, 30,
|
||||
31, 32
|
||||
))
|
||||
val serializedBytes = expected.checkpointSerialize(context)
|
||||
val actual = serializedBytes.checkpointDeserialize(context)
|
||||
assertEquals(expected, actual)
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
private object TestSingleton
|
||||
|
||||
object SimpleSteps {
|
||||
object ONE : ProgressTracker.Step("one")
|
||||
object TWO : ProgressTracker.Step("two")
|
||||
object THREE : ProgressTracker.Step("three")
|
||||
object FOUR : ProgressTracker.Step("four")
|
||||
|
||||
fun tracker() = ProgressTracker(ONE, TWO, THREE, FOUR)
|
||||
}
|
||||
|
||||
object ChildSteps {
|
||||
object AYY : ProgressTracker.Step("ayy")
|
||||
object BEE : ProgressTracker.Step("bee")
|
||||
object SEA : ProgressTracker.Step("sea")
|
||||
|
||||
fun tracker() = ProgressTracker(AYY, BEE, SEA)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun rxSubscriptionsAreNotSerialized() {
|
||||
val pt: ProgressTracker = SimpleSteps.tracker()
|
||||
val pt2: ProgressTracker = ChildSteps.tracker()
|
||||
|
||||
class Unserializable : KryoSerializable {
|
||||
override fun write(kryo: Kryo?, output: Output?) = throw AssertionError("not called")
|
||||
override fun read(kryo: Kryo?, input: Input?) = throw AssertionError("not called")
|
||||
|
||||
fun foo() {
|
||||
println("bar")
|
||||
}
|
||||
}
|
||||
|
||||
pt.setChildProgressTracker(SimpleSteps.TWO, pt2)
|
||||
class Tmp {
|
||||
val unserializable = Unserializable()
|
||||
|
||||
init {
|
||||
pt2.changes.subscribe { unserializable.foo() }
|
||||
}
|
||||
}
|
||||
Tmp()
|
||||
val context = CheckpointSerializationContextImpl(
|
||||
javaClass.classLoader,
|
||||
AllWhitelist,
|
||||
emptyMap(),
|
||||
true,
|
||||
null)
|
||||
pt.checkpointSerialize(context)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialize - deserialize Exception with suppressed`() {
|
||||
val exception = IllegalArgumentException("fooBar")
|
||||
val toBeSuppressedOnSenderSide = IllegalStateException("bazz1")
|
||||
exception.addSuppressed(toBeSuppressedOnSenderSide)
|
||||
val exception2 = exception.checkpointSerialize(context).checkpointDeserialize(context)
|
||||
assertEquals(exception.message, exception2.message)
|
||||
|
||||
assertEquals(1, exception2.suppressed.size)
|
||||
assertNotNull({ exception2.suppressed.find { it.message == toBeSuppressedOnSenderSide.message } })
|
||||
|
||||
val toBeSuppressedOnReceiverSide = IllegalStateException("bazz2")
|
||||
exception2.addSuppressed(toBeSuppressedOnReceiverSide)
|
||||
assertTrue { exception2.suppressed.contains(toBeSuppressedOnReceiverSide) }
|
||||
assertEquals(2, exception2.suppressed.size)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialize - deserialize Exception no suppressed`() {
|
||||
val exception = IllegalArgumentException("fooBar")
|
||||
val exception2 = exception.checkpointSerialize(context).checkpointDeserialize(context)
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(0, exception2.suppressed.size)
|
||||
|
||||
val toBeSuppressedOnReceiverSide = IllegalStateException("bazz2")
|
||||
exception2.addSuppressed(toBeSuppressedOnReceiverSide)
|
||||
assertEquals(1, exception2.suppressed.size)
|
||||
assertTrue { exception2.suppressed.contains(toBeSuppressedOnReceiverSide) }
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `serialize - deserialize HashNotFound`() {
|
||||
val randomHash = SecureHash.randomSHA256()
|
||||
val exception = FetchDataFlow.HashNotFound(randomHash)
|
||||
val exception2 = exception.checkpointSerialize(context).checkpointDeserialize(context)
|
||||
assertEquals(randomHash, exception2.requested)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `compression has the desired effect`() {
|
||||
compression ?: return
|
||||
val data = ByteArray(12345).also { Random(0).nextBytes(it) }.let { it + it }
|
||||
val compressed = data.checkpointSerialize(context)
|
||||
assertEquals(.5, compressed.size.toDouble() / data.size, .03)
|
||||
assertArrayEquals(data, compressed.checkpointDeserialize(context))
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `a particular encoding can be banned for deserialization`() {
|
||||
compression ?: return
|
||||
doReturn(false).whenever(context.encodingWhitelist).acceptEncoding(compression)
|
||||
val compressed = "whatever".checkpointSerialize(context)
|
||||
catchThrowable { compressed.checkpointDeserialize(context) }.run {
|
||||
assertSame<Any>(KryoException::class.java, javaClass)
|
||||
assertEquals(encodingNotPermittedFormat.format(compression), message)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `compression reduces number of bytes significantly`() {
|
||||
class Holder(val holder: ByteArray)
|
||||
|
||||
val obj = Holder(ByteArray(20000))
|
||||
val uncompressedSize = obj.checkpointSerialize(context.withEncoding(null)).size
|
||||
val compressedSize = obj.checkpointSerialize(context.withEncoding(CordaSerializationEncoding.SNAPPY)).size
|
||||
// If these need fixing, sounds like Kryo wire format changed and checkpoints might not survive an upgrade.
|
||||
if (SystemUtils.IS_JAVA_11)
|
||||
assertEquals(20184, uncompressedSize)
|
||||
else
|
||||
assertEquals(20234, uncompressedSize)
|
||||
assertEquals(1123, compressedSize)
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
package net.corda.nodeapi.internal.serialization.testutils
|
||||
|
||||
import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.amqp.RpcServerObservableSerializer
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.serialization.internal.CordaSerializationMagic
|
||||
import net.corda.serialization.internal.AllWhitelist
|
||||
import net.corda.serialization.internal.amqp.*
|
||||
import net.corda.nodeapi.internal.rpc.client.ObservableContext as ClientObservableContext
|
||||
|
||||
/**
|
||||
* Special serialization context for the round trip tests that allows for both server and client RPC
|
||||
* operations
|
||||
*/
|
||||
|
||||
|
||||
class AMQPRoundTripRPCSerializationScheme(
|
||||
private val serializationContext: SerializationContext,
|
||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
|
||||
cordappSerializationWhitelists: Set<SerializationWhitelist>,
|
||||
serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>)
|
||||
: AbstractAMQPSerializationScheme(
|
||||
cordappCustomSerializers, cordappSerializationWhitelists, serializerFactoriesForContexts
|
||||
) {
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
return SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader).apply {
|
||||
register(RpcClientObservableDeSerializer)
|
||||
}
|
||||
}
|
||||
|
||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
return SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader).apply {
|
||||
register(RpcServerObservableSerializer())
|
||||
}
|
||||
}
|
||||
|
||||
override fun canDeserializeVersion(
|
||||
magic: CordaSerializationMagic,
|
||||
target: SerializationContext.UseCase) = true
|
||||
|
||||
fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) =
|
||||
rpcClientSerializerFactory(
|
||||
RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
|
||||
.withProperty(RPCApi.RpcRequestOrObservableIdKey, id))
|
||||
|
||||
fun rpcServerSerializerFactory(observableContext: TestObservableContext) =
|
||||
rpcServerSerializerFactory(
|
||||
RpcServerObservableSerializer.createContext(serializationContext, observableContext))
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package net.corda.nodeapi.internal.serialization.testutils
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.nodeapi.internal.rpc.ObservableContextInterface
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.rpc.ObservableSubscription
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class TestObservableContext(
|
||||
override val observableMap: Cache<Trace.InvocationId, ObservableSubscription>,
|
||||
override val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<Trace.InvocationId>>,
|
||||
override val deduplicationIdentity: String,
|
||||
override val clientAddress: SimpleString
|
||||
) : ObservableContextInterface {
|
||||
override fun sendMessage(serverToClient: RPCApi.ServerToClient) { }
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package net.corda.nodeapi.internal.serialization.testutils
|
||||
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.serialization.internal.AllWhitelist
|
||||
import net.corda.serialization.internal.SerializationContextImpl
|
||||
import net.corda.serialization.internal.amqp.amqpMagic
|
||||
|
||||
val serializationProperties: MutableMap<Any, Any> = mutableMapOf()
|
||||
|
||||
val serializationContext = SerializationContextImpl(
|
||||
preferredSerializationVersion = amqpMagic,
|
||||
deserializationClassLoader = ClassLoader.getSystemClassLoader(),
|
||||
whitelist = AllWhitelist,
|
||||
properties = serializationProperties,
|
||||
objectReferencesEnabled = false,
|
||||
useCase = SerializationContext.UseCase.Testing,
|
||||
encoding = null
|
||||
)
|
Reference in New Issue
Block a user