mirror of
https://github.com/corda/corda.git
synced 2025-01-22 20:38:05 +00:00
Corda-847 - Remove Kryo for RPC
It's no longer used as we've switched over to AMQP for RPC calls so remove it from everywhere and only use it for checkpointing * Wire up demo bench post Kryo removal * Test Fixes * rebase and fix tests * Test Fix * wip * revert changes to api now we don't need to add annotations
This commit is contained in:
parent
f850daa582
commit
0c3a30edc8
@ -76,16 +76,10 @@ public final class net.corda.core.concurrent.ConcurrencyUtils extends java.lang.
|
||||
@NotNull
|
||||
public static final String shortCircuitedTaskFailedMessage = "Short-circuited task failed:"
|
||||
##
|
||||
<<<<<<< HEAD
|
||||
public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future
|
||||
public abstract void then(kotlin.jvm.functions.Function1<? super net.corda.core.concurrent.CordaFuture<V>, ? extends W>)
|
||||
@NotNull
|
||||
public abstract java.util.concurrent.CompletableFuture<V> toCompletableFuture()
|
||||
=======
|
||||
@net.corda.core.serialization.CordaSerializable public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future
|
||||
public abstract void then(kotlin.jvm.functions.Function1)
|
||||
@org.jetbrains.annotations.NotNull public abstract concurrent.CompletableFuture toCompletableFuture()
|
||||
>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation
|
||||
##
|
||||
@CordaSerializable
|
||||
public final class net.corda.core.context.Actor extends java.lang.Object
|
||||
@ -2621,12 +2615,8 @@ public final class net.corda.core.messaging.DataFeed extends java.lang.Object
|
||||
public int hashCode()
|
||||
public String toString()
|
||||
##
|
||||
<<<<<<< HEAD
|
||||
@DoNotImplement
|
||||
public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable
|
||||
=======
|
||||
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable
|
||||
>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation
|
||||
public abstract void close()
|
||||
@NotNull
|
||||
public abstract net.corda.core.flows.StateMachineRunId getId()
|
||||
@ -2652,12 +2642,8 @@ public final class net.corda.core.messaging.FlowHandleImpl extends java.lang.Obj
|
||||
public int hashCode()
|
||||
public String toString()
|
||||
##
|
||||
<<<<<<< HEAD
|
||||
@DoNotImplement
|
||||
public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle
|
||||
=======
|
||||
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle
|
||||
>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation
|
||||
public abstract void close()
|
||||
@NotNull
|
||||
public abstract rx.Observable<String> getProgress()
|
||||
|
@ -1,55 +0,0 @@
|
||||
package net.corda.client.rpc.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.internal.SerializationEnvironment
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||
import net.corda.nodeapi.internal.serialization.kryo.RPCKryo
|
||||
|
||||
private val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||
net.corda.core.serialization.SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.RPCClient,
|
||||
null)
|
||||
|
||||
class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return magic == kryoMagic && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
|
||||
}
|
||||
|
||||
override fun rpcClientKryoPool(context: SerializationContext): KryoPool {
|
||||
return KryoPool.Builder {
|
||||
DefaultKryoCustomizer.customize(RPCKryo(RpcClientObservableSerializer, context), publicKeySerializer).apply {
|
||||
classLoader = context.deserializationClassLoader
|
||||
}
|
||||
}.build()
|
||||
}
|
||||
|
||||
// We're on the client and don't have access to server classes.
|
||||
override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
||||
companion object {
|
||||
/** Call from main only. */
|
||||
fun initialiseSerialization(classLoader: ClassLoader? = null) {
|
||||
nodeSerializationEnv = createSerializationEnv(classLoader)
|
||||
}
|
||||
|
||||
fun createSerializationEnv(classLoader: ClassLoader? = null): SerializationEnvironment {
|
||||
return SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(KryoClientSerializationScheme())
|
||||
registerScheme(AMQPClientSerializationScheme(emptyList()))
|
||||
},
|
||||
if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT,
|
||||
rpcClientContext = if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
package net.corda.client.rpc.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import net.corda.client.rpc.internal.ObservableContext
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.subjects.UnicastSubject
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/**
|
||||
* A [Serializer] to deserialize Observables once the corresponding Kryo instance has been provided with an [ObservableContext].
|
||||
*/
|
||||
object RpcClientObservableSerializer : Serializer<Observable<*>>() {
|
||||
private object RpcObservableContextKey
|
||||
|
||||
fun createContext(
|
||||
serializationContext: SerializationContext,
|
||||
observableContext: ObservableContext
|
||||
): SerializationContext {
|
||||
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
||||
}
|
||||
|
||||
private fun <T> pinInSubscriptions(observable: Observable<T>, hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
|
||||
val refCount = AtomicInteger(0)
|
||||
return observable.doOnSubscribe {
|
||||
if (refCount.getAndIncrement() == 0) {
|
||||
require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" }
|
||||
}
|
||||
}.doOnUnsubscribe {
|
||||
if (refCount.decrementAndGet() == 0) {
|
||||
require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Observable<*>>): Observable<Any> {
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
|
||||
val observableId = input.readInvocationId() ?: throw IllegalStateException("Unable to read invocationId from Input.")
|
||||
val observable = UnicastSubject.create<Notification<*>>()
|
||||
require(observableContext.observableMap.getIfPresent(observableId) == null) {
|
||||
"Multiple Observables arrived with the same ID $observableId"
|
||||
}
|
||||
val rpcCallSite = getRpcCallSite(kryo, observableContext)
|
||||
observableContext.observableMap.put(observableId, observable)
|
||||
observableContext.callSiteMap?.put(observableId, rpcCallSite)
|
||||
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
|
||||
// don't need to store a reference to the Observables themselves.
|
||||
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
|
||||
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
|
||||
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
|
||||
// The unsubscribe is due to [ObservableToFuture]'s use of first().
|
||||
observableContext.observableMap.invalidate(observableId)
|
||||
}.dematerialize()
|
||||
}
|
||||
|
||||
private fun Input.readInvocationId(): Trace.InvocationId? {
|
||||
|
||||
val value = readString() ?: return null
|
||||
val timestamp = readLong()
|
||||
return Trace.InvocationId(value, Instant.ofEpochMilli(timestamp))
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
|
||||
throw UnsupportedOperationException("Cannot serialise Observables on the client side")
|
||||
}
|
||||
|
||||
private fun getRpcCallSite(kryo: Kryo, observableContext: ObservableContext): Throwable? {
|
||||
val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId
|
||||
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
|
||||
}
|
||||
}
|
@ -2,16 +2,20 @@ package net.corda.core.utilities
|
||||
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
|
||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.ExpectedException
|
||||
|
||||
object EmptyWhitelist : ClassWhitelist {
|
||||
override fun hasListed(type: Class<*>): Boolean = false
|
||||
}
|
||||
|
||||
class KotlinUtilsTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
@ -20,6 +24,14 @@ class KotlinUtilsTest {
|
||||
@Rule
|
||||
val expectedEx: ExpectedException = ExpectedException.none()
|
||||
|
||||
val KRYO_CHECKPOINT_NOWHITELIST_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
EmptyWhitelist,
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.Checkpoint,
|
||||
null)
|
||||
|
||||
@Test
|
||||
fun `transient property which is null`() {
|
||||
val test = NullTransientProperty()
|
||||
@ -43,7 +55,7 @@ class KotlinUtilsTest {
|
||||
expectedEx.expect(KryoException::class.java)
|
||||
expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization")
|
||||
val original = NonCapturingTransientProperty()
|
||||
original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize()
|
||||
original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -61,8 +73,10 @@ class KotlinUtilsTest {
|
||||
fun `deserialise transient property with capturing lambda`() {
|
||||
expectedEx.expect(KryoException::class.java)
|
||||
expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization")
|
||||
|
||||
val original = CapturingTransientProperty("Hello")
|
||||
original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize()
|
||||
|
||||
original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT)
|
||||
}
|
||||
|
||||
private class NullTransientProperty {
|
||||
|
@ -6,6 +6,10 @@ release, see :doc:`upgrade-notes`.
|
||||
|
||||
Unreleased
|
||||
==========
|
||||
|
||||
* RPC Framework moved from Kryo to the Corda AMQP implementation [Corda-847]. This completes the removal
|
||||
of ``Kryo`` from general use within Corda, remaining only for use in flow checkpointing.
|
||||
|
||||
* Set co.paralleluniverse.fibers.verifyInstrumentation=true in devMode.
|
||||
|
||||
* Node will now gracefully fail to start if one of the required ports is already in use.
|
||||
|
@ -114,7 +114,8 @@ open class SerializationFactoryImpl(
|
||||
val lookupKey = magic to target
|
||||
return schemes.computeIfAbsent(lookupKey) {
|
||||
registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single?
|
||||
logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes")
|
||||
logger.warn("Cannot find serialization scheme for: [$lookupKey, " +
|
||||
"${if (magic == amqpMagic) "AMQP" else if (magic == kryoMagic) "Kryo" else "UNKNOWN MAGIC"}] registeredSchemes are: $registeredSchemes")
|
||||
throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.")
|
||||
} to magic
|
||||
}
|
||||
|
@ -285,7 +285,11 @@ open class SerializerFactory(
|
||||
}
|
||||
|
||||
private fun makeClassSerializer(clazz: Class<*>, type: Type, declaredType: Type): AMQPSerializer<Any> = serializersByType.computeIfAbsent(type) {
|
||||
if (isPrimitive(clazz)) {
|
||||
if (clazz.isSynthetic) {
|
||||
// Explicitly ban synthetic classes, we have no way of recreating them when deserializing. This also
|
||||
// captures Lambda expressions and other anonymous functions
|
||||
throw NotSerializableException(type.typeName)
|
||||
} else if (isPrimitive(clazz)) {
|
||||
AMQPPrimitiveSerializer(clazz)
|
||||
} else {
|
||||
findCustomSerializer(clazz, declaredType) ?: run {
|
||||
|
@ -7,7 +7,6 @@ import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
|
||||
import com.esotericsoftware.kryo.serializers.FieldSerializer
|
||||
import com.esotericsoftware.kryo.util.MapReferenceResolver
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.PrivacySalt
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
@ -18,8 +17,6 @@ import net.corda.core.serialization.SerializationContext.UseCase.Checkpoint
|
||||
import net.corda.core.serialization.SerializationContext.UseCase.Storage
|
||||
import net.corda.core.serialization.SerializeAsTokenContext
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.toObservable
|
||||
import net.corda.core.transactions.*
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
@ -27,7 +24,6 @@ import net.corda.nodeapi.internal.serialization.CordaClassResolver
|
||||
import net.corda.nodeapi.internal.serialization.serializationContextKey
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Observable
|
||||
import java.io.InputStream
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.security.PrivateKey
|
||||
@ -46,39 +42,16 @@ import kotlin.reflect.jvm.isAccessible
|
||||
import kotlin.reflect.jvm.javaType
|
||||
|
||||
/**
|
||||
* Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead
|
||||
* simple, totally non-extensible binary (sub)format.
|
||||
*
|
||||
* This is NOT what should be used in any final platform product, rather, the final state should be a precisely
|
||||
* specified and standardised binary format with attention paid to anti-malleability, versioning and performance.
|
||||
* FIX SBE is a potential candidate: it prioritises performance over convenience and was designed for HFT. Google
|
||||
* Protocol Buffers with a minor tightening to make field reordering illegal is another possibility.
|
||||
*
|
||||
* FIX SBE:
|
||||
* https://real-logic.github.io/simple-binary-encoding/
|
||||
* http://mechanical-sympathy.blogspot.co.at/2014/05/simple-binary-encoding.html
|
||||
* Protocol buffers:
|
||||
* https://developers.google.com/protocol-buffers/
|
||||
*
|
||||
* But for now we use Kryo to maximise prototyping speed.
|
||||
*
|
||||
* Note that this code ignores *ALL* concerns beyond convenience, in particular it ignores:
|
||||
*
|
||||
* - Performance
|
||||
* - Security
|
||||
*
|
||||
* This code will happily deserialise literally anything, including malicious streams that would reconstruct classes
|
||||
* in invalid states, thus violating system invariants. It isn't designed to handle malicious streams and therefore,
|
||||
* isn't usable beyond the prototyping stage. But that's fine: we can revisit serialisation technologies later after
|
||||
* a formal evaluation process.
|
||||
*
|
||||
* We now distinguish between internal, storage related Kryo and external, network facing Kryo. We presently use
|
||||
* some non-whitelisted classes as part of internal storage.
|
||||
* TODO: eliminate internal, storage related whitelist issues, such as private keys in blob storage.
|
||||
* Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead
|
||||
* simple, totally non-extensible binary (sub)format. Used exclusively within Corda for checkpointing flows as
|
||||
* it will happily deserialise literally anything, including malicious streams that would reconstruct classes
|
||||
* in invalid states and thus violating system invariants. In the context of checkpointing a Java stack, this is
|
||||
* absolutely the functionality we desire, for a stable binary wire format and persistence technology, we have
|
||||
* the AMQP implementation.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A serialiser that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure
|
||||
* A serializer that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure
|
||||
* type safety hack.
|
||||
*/
|
||||
object SerializedBytesSerializer : Serializer<SerializedBytes<Any>>() {
|
||||
@ -391,44 +364,6 @@ open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapRefe
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The Kryo used for the RPC wire protocol.
|
||||
*/
|
||||
// Every type in the wire protocol is listed here explicitly.
|
||||
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
|
||||
// because we can see everything we're using in one place.
|
||||
class RPCKryo(observableSerializer: Serializer<Observable<*>>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) {
|
||||
init {
|
||||
DefaultKryoCustomizer.customize(this)
|
||||
|
||||
// RPC specific classes
|
||||
register(InputStream::class.java, InputStreamSerializer)
|
||||
register(Observable::class.java, observableSerializer)
|
||||
register(CordaFuture::class,
|
||||
read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() },
|
||||
write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) }
|
||||
)
|
||||
}
|
||||
|
||||
override fun getRegistration(type: Class<*>): Registration {
|
||||
if (Observable::class.java != type && Observable::class.java.isAssignableFrom(type)) {
|
||||
return super.getRegistration(Observable::class.java)
|
||||
}
|
||||
if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) {
|
||||
return super.getRegistration(InputStream::class.java)
|
||||
}
|
||||
if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) {
|
||||
return super.getRegistration(CordaFuture::class.java)
|
||||
}
|
||||
type.requireExternal("RPC not allowed to deserialise internal classes")
|
||||
return super.getRegistration(type)
|
||||
}
|
||||
|
||||
private fun Class<*>.requireExternal(msg: String) {
|
||||
require(!name.startsWith("net.corda.node.") && ".internal" !in name) { "$msg: $name" }
|
||||
}
|
||||
}
|
||||
|
||||
inline fun <T : Any> Kryo.register(
|
||||
type: KClass<T>,
|
||||
crossinline read: (Kryo, Input) -> T,
|
||||
|
@ -4,13 +4,13 @@ import com.google.common.collect.Maps;
|
||||
import net.corda.core.serialization.SerializationContext;
|
||||
import net.corda.core.serialization.SerializationFactory;
|
||||
import net.corda.core.serialization.SerializedBytes;
|
||||
import net.corda.nodeapi.internal.serialization.kryo.CordaClosureBlacklistSerializer;
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt;
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SchemaKt;
|
||||
import net.corda.testing.core.SerializationEnvironmentRule;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.NotSerializableException;
|
||||
import java.io.Serializable;
|
||||
import java.util.EnumSet;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -33,20 +33,17 @@ public final class ForbiddenLambdaSerializationTests {
|
||||
@Test
|
||||
public final void serialization_fails_for_serializable_java_lambdas() {
|
||||
contexts.forEach(ctx -> {
|
||||
SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(),
|
||||
SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(),
|
||||
this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null);
|
||||
String value = "Hey";
|
||||
Callable<String> target = (Callable<String> & Serializable) () -> value;
|
||||
|
||||
Throwable throwable = catchThrowable(() -> serialize(target, context));
|
||||
|
||||
assertThat(throwable).isNotNull();
|
||||
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
|
||||
if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) {
|
||||
assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE);
|
||||
} else {
|
||||
assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes");
|
||||
}
|
||||
assertThat(throwable)
|
||||
.isNotNull()
|
||||
.isInstanceOf(NotSerializableException.class)
|
||||
.hasMessageContaining(getClass().getName());
|
||||
});
|
||||
}
|
||||
|
||||
@ -54,21 +51,17 @@ public final class ForbiddenLambdaSerializationTests {
|
||||
@SuppressWarnings("unchecked")
|
||||
public final void serialization_fails_for_not_serializable_java_lambdas() {
|
||||
contexts.forEach(ctx -> {
|
||||
SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(),
|
||||
SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(),
|
||||
this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null);
|
||||
String value = "Hey";
|
||||
Callable<String> target = () -> value;
|
||||
|
||||
Throwable throwable = catchThrowable(() -> serialize(target, context));
|
||||
|
||||
assertThat(throwable).isNotNull();
|
||||
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
|
||||
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
|
||||
if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) {
|
||||
assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE);
|
||||
} else {
|
||||
assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes");
|
||||
}
|
||||
assertThat(throwable)
|
||||
.isNotNull()
|
||||
.isInstanceOf(NotSerializableException.class)
|
||||
.hasMessageContaining(getClass().getName());
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -9,14 +9,12 @@ import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.createDevKeyStores
|
||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
|
||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
|
@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.KryoSerializable
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import com.google.common.primitives.Ints
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
@ -15,7 +16,6 @@ import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
@ -35,6 +35,17 @@ import java.time.Instant
|
||||
import java.util.*
|
||||
import kotlin.test.*
|
||||
|
||||
class TestScheme : AbstractKryoSerializationScheme() {
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient
|
||||
}
|
||||
|
||||
override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
||||
override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
||||
}
|
||||
|
||||
@RunWith(Parameterized::class)
|
||||
class KryoTests(private val compression: CordaSerializationEncoding?) {
|
||||
companion object {
|
||||
@ -49,7 +60,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
|
||||
factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) }
|
||||
context = SerializationContextImpl(kryoMagic,
|
||||
javaClass.classLoader,
|
||||
AllWhitelist,
|
||||
@ -270,7 +281,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
|
||||
}
|
||||
}
|
||||
Tmp()
|
||||
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
|
||||
val factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) }
|
||||
val context = SerializationContextImpl(kryoMagic,
|
||||
javaClass.classLoader,
|
||||
AllWhitelist,
|
||||
|
@ -1,7 +1,7 @@
|
||||
package net.corda.node.internal
|
||||
|
||||
import com.codahale.metrics.JmxReporter
|
||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.internal.Emoji
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
@ -384,9 +384,9 @@ open class Node(configuration: NodeConfiguration,
|
||||
val classloader = cordappLoader.appClassLoader
|
||||
nodeSerializationEnv = SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(KryoServerSerializationScheme())
|
||||
registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps))
|
||||
registerScheme(KryoClientSerializationScheme())
|
||||
registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps))
|
||||
registerScheme(KryoServerSerializationScheme() )
|
||||
},
|
||||
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
|
||||
rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader),
|
||||
|
@ -39,8 +39,8 @@ class AMQPServerSerializationScheme(
|
||||
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return canDeserializeVersion(magic) &&
|
||||
( target == SerializationContext.UseCase.P2P
|
||||
|| target == SerializationContext.UseCase.Storage
|
||||
|| target == SerializationContext.UseCase.RPCServer)
|
||||
( target == SerializationContext.UseCase.P2P
|
||||
|| target == SerializationContext.UseCase.Storage
|
||||
|| target == SerializationContext.UseCase.RPCServer)
|
||||
}
|
||||
}
|
||||
|
@ -4,22 +4,15 @@ import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||
import net.corda.nodeapi.internal.serialization.kryo.RPCKryo
|
||||
|
||||
class KryoServerSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient
|
||||
return magic == kryoMagic && target == SerializationContext.UseCase.Checkpoint
|
||||
}
|
||||
|
||||
override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
||||
override fun rpcServerKryoPool(context: SerializationContext): KryoPool {
|
||||
return KryoPool.Builder {
|
||||
DefaultKryoCustomizer.customize(RPCKryo(RpcServerObservableSerializer, context), publicKeySerializer).apply {
|
||||
classLoader = context.deserializationClassLoader
|
||||
}
|
||||
}.build()
|
||||
}
|
||||
override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
||||
}
|
@ -1,87 +0,0 @@
|
||||
package net.corda.node.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.node.services.messaging.ObservableSubscription
|
||||
import net.corda.node.services.messaging.RPCServer
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
|
||||
object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
private object RpcObservableContextKey
|
||||
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
|
||||
fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext {
|
||||
return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo?, input: Input?, type: Class<Observable<*>>?): Observable<Any> {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
|
||||
val observableId = Trace.InvocationId.newInstance()
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext
|
||||
output.writeInvocationId(observableId)
|
||||
val observableWithSubscription = ObservableSubscription(
|
||||
// We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing
|
||||
// must be done again within the subscriber
|
||||
subscription = observable.materialize().subscribe(
|
||||
object : Subscriber<Notification<*>>() {
|
||||
override fun onNext(observation: Notification<*>) {
|
||||
if (!isUnsubscribed) {
|
||||
val message = RPCApi.ServerToClient.Observation(
|
||||
id = observableId,
|
||||
content = observation,
|
||||
deduplicationIdentity = observableContext.deduplicationIdentity
|
||||
)
|
||||
observableContext.sendMessage(message)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onError(exception: Throwable) {
|
||||
log.error("onError called in materialize()d RPC Observable", exception)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables != null) {
|
||||
observables.remove(observableId)
|
||||
if (observables.isEmpty()) {
|
||||
null
|
||||
} else {
|
||||
observables
|
||||
}
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables == null) {
|
||||
hashSetOf(observableId)
|
||||
} else {
|
||||
observables.add(observableId)
|
||||
observables
|
||||
}
|
||||
}
|
||||
observableContext.observableMap.put(observableId, observableWithSubscription)
|
||||
}
|
||||
|
||||
private fun Output.writeInvocationId(id: Trace.InvocationId) {
|
||||
writeString(id.value)
|
||||
writeLong(id.timestamp.toEpochMilli())
|
||||
}
|
||||
}
|
@ -6,6 +6,11 @@ import net.corda.nodeapi.RPCApi
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/**
|
||||
* An observable context is constructed on each RPC request. If subsequently a nested Observable is encountered this
|
||||
* same context is propagated by the serialization context. This way all observations rooted in a single RPC will be
|
||||
* muxed correctly. Note that the context construction itself is quite cheap.
|
||||
*/
|
||||
interface ObservableContextInterface {
|
||||
fun sendMessage(serverToClient: RPCApi.ServerToClient)
|
||||
|
||||
|
@ -409,7 +409,7 @@ class RPCServer(
|
||||
|
||||
/*
|
||||
* We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this
|
||||
* same context is propagated by the instrumented KryoPool. This way all observations rooted in a single RPC will be
|
||||
* same context is propagated by serialization context. This way all observations rooted in a single RPC will be
|
||||
* muxed correctly. Note that the context construction itself is quite cheap.
|
||||
*/
|
||||
inner class ObservableContext(
|
||||
|
@ -60,7 +60,8 @@ class RoundTripObservableSerializerTests {
|
||||
|
||||
@Test
|
||||
fun roundTripTest1() {
|
||||
val serializationScheme = AMQPRoundTripRPCSerializationScheme(serializationContext)
|
||||
val serializationScheme = AMQPRoundTripRPCSerializationScheme(
|
||||
serializationContext, emptySet(), ConcurrentHashMap())
|
||||
|
||||
// Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap,
|
||||
// the client as a property of the deserializer which, in the actual RPC client, is pulled off of
|
||||
|
@ -35,7 +35,7 @@ class RpcServerObservableSerializerTests {
|
||||
|
||||
@Test
|
||||
fun canSerializerBeRegistered() {
|
||||
val sf = SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist)
|
||||
val sf = SerializerFactory(AllWhitelist, javaClass.classLoader)
|
||||
|
||||
try {
|
||||
sf.register(RpcServerObservableSerializer())
|
||||
@ -68,10 +68,7 @@ class RpcServerObservableSerializerTests {
|
||||
deduplicationIdentity = "thisIsATest",
|
||||
clientAddress = SimpleString(testClientAddress))
|
||||
|
||||
val sf = SerializerFactory(
|
||||
cl = javaClass.classLoader,
|
||||
whitelist = AllWhitelist
|
||||
).apply {
|
||||
val sf = SerializerFactory(AllWhitelist, javaClass.classLoader).apply {
|
||||
register(RpcServerObservableSerializer())
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.internal.serialization.testutils
|
||||
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.serialization.ClassWhitelist
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
|
||||
@ -11,30 +12,30 @@ import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext
|
||||
|
||||
/**
|
||||
* Special serialization context for the round trip tests that allows for both server and client RPC
|
||||
* operations
|
||||
*/
|
||||
|
||||
|
||||
class AMQPRoundTripRPCSerializationScheme(
|
||||
private val serializationContext: SerializationContext,
|
||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet())
|
||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
|
||||
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>)
|
||||
: AbstractAMQPSerializationScheme(
|
||||
cordappCustomSerializers
|
||||
cordappCustomSerializers, serializerFactoriesForContexts
|
||||
) {
|
||||
constructor(
|
||||
serializationContext: SerializationContext,
|
||||
cordapps: List<Cordapp>) : this(serializationContext, cordapps.customSerializers)
|
||||
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply {
|
||||
return SerializerFactory(AllWhitelist, javaClass.classLoader).apply {
|
||||
register(RpcClientObservableSerializer)
|
||||
}
|
||||
}
|
||||
|
||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply {
|
||||
return SerializerFactory(AllWhitelist, javaClass.classLoader).apply {
|
||||
register(RpcServerObservableSerializer())
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +0,0 @@
|
||||
package net.corda.node.internal.serialization.testutils
|
||||
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactoryFactory
|
||||
|
||||
class TestSerializerFactoryFactory : SerializerFactoryFactory() {
|
||||
override fun make(context: SerializationContext): SerializerFactory {
|
||||
return super.make(context)
|
||||
}
|
||||
}
|
@ -4,10 +4,9 @@ import com.nhaarman.mockito_kotlin.doNothing
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
||||
import net.corda.core.serialization.internal.*
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
@ -31,10 +30,10 @@ fun <T> withoutTestSerialization(callable: () -> T): T { // TODO: Delete this, s
|
||||
|
||||
internal fun createTestSerializationEnv(label: String): SerializationEnvironmentImpl {
|
||||
val factory = SerializationFactoryImpl().apply {
|
||||
registerScheme(KryoClientSerializationScheme())
|
||||
registerScheme(KryoServerSerializationScheme())
|
||||
registerScheme(AMQPClientSerializationScheme(emptyList()))
|
||||
registerScheme(AMQPServerSerializationScheme(emptyList()))
|
||||
// needed for checkpointing
|
||||
registerScheme(KryoServerSerializationScheme())
|
||||
}
|
||||
return object : SerializationEnvironmentImpl(
|
||||
factory,
|
||||
|
@ -2,7 +2,6 @@ package net.corda.demobench
|
||||
|
||||
import javafx.scene.image.Image
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.demobench.views.DemoBenchView
|
||||
@ -59,7 +58,6 @@ class DemoBench : App(DemoBenchView::class) {
|
||||
private fun initialiseSerialization() {
|
||||
nodeSerializationEnv = SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(KryoClientSerializationScheme())
|
||||
registerScheme(AMQPClientSerializationScheme(emptyList()))
|
||||
},
|
||||
AMQP_P2P_CONTEXT)
|
||||
|
@ -22,6 +22,7 @@ import org.assertj.core.api.Assertions.assertThat
|
||||
import org.bouncycastle.util.io.Streams
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.lang.Thread.sleep
|
||||
import java.net.ConnectException
|
||||
import kotlin.test.assertTrue
|
||||
import kotlin.test.fail
|
||||
@ -55,7 +56,7 @@ class SSHServerTest {
|
||||
// The driver will automatically pick up the annotated flows below
|
||||
driver {
|
||||
val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user),
|
||||
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
|
||||
customOverrides = mapOf("sshd" to mapOf("port" to 2222)) /*, startInSameProcess = true */)
|
||||
node.getOrThrow()
|
||||
|
||||
val session = JSch().getSession("u", "localhost", 2222)
|
||||
|
Loading…
Reference in New Issue
Block a user