From 0c3a30edc82b71699fc1aaf349418626639508b1 Mon Sep 17 00:00:00 2001 From: Kat Baker Date: Thu, 10 May 2018 21:21:42 +0100 Subject: [PATCH] Corda-847 - Remove Kryo for RPC It's no longer used as we've switched over to AMQP for RPC calls so remove it from everywhere and only use it for checkpointing * Wire up demo bench post Kryo removal * Test Fixes * rebase and fix tests * Test Fix * wip * revert changes to api now we don't need to add annotations --- .ci/api-current.txt | 14 --- .../kryo/KryoClientSerializationScheme.kt | 55 ------------ .../kryo/RpcClientObservableSerializer.kt | 78 ----------------- .../corda/core/utilities/KotlinUtilsTest.kt | 24 +++-- docs/source/changelog.rst | 4 + .../serialization/SerializationScheme.kt | 3 +- .../serialization/amqp/SerializerFactory.kt | 6 +- .../internal/serialization/kryo/Kryo.kt | 79 ++--------------- .../ForbiddenLambdaSerializationTests.java | 31 +++---- .../internal/crypto/X509UtilitiesTest.kt | 2 - .../internal/serialization/kryo/KryoTests.kt | 17 +++- .../kotlin/net/corda/node/internal/Node.kt | 6 +- .../amqp/AMQPServerSerializationScheme.kt | 6 +- .../kryo/KryoServerSerializationScheme.kt | 13 +-- .../kryo/RpcServerObservableSerializer.kt | 87 ------------------- .../messaging/ObservableContextInterface.kt | 5 ++ .../node/services/messaging/RPCServer.kt | 2 +- .../RoundTripObservableSerializerTests.kt | 3 +- .../RpcServerObservableSerializerTests.kt | 7 +- .../testutils/AMQPTestSerialiationScheme.kt | 17 ++-- .../testutils/TestSerializerFactoryFactory.kt | 11 --- .../InternalSerializationTestHelpers.kt | 7 +- .../kotlin/net/corda/demobench/DemoBench.kt | 2 - .../net/corda/tools/shell/SSHServerTest.kt | 3 +- 24 files changed, 96 insertions(+), 386 deletions(-) delete mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt delete mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt delete mode 100644 node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt delete mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 107c0b060f..b088f61edb 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -76,16 +76,10 @@ public final class net.corda.core.concurrent.ConcurrencyUtils extends java.lang. @NotNull public static final String shortCircuitedTaskFailedMessage = "Short-circuited task failed:" ## -<<<<<<< HEAD public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future public abstract void then(kotlin.jvm.functions.Function1, ? extends W>) @NotNull public abstract java.util.concurrent.CompletableFuture toCompletableFuture() -======= -@net.corda.core.serialization.CordaSerializable public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future - public abstract void then(kotlin.jvm.functions.Function1) - @org.jetbrains.annotations.NotNull public abstract concurrent.CompletableFuture toCompletableFuture() ->>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation ## @CordaSerializable public final class net.corda.core.context.Actor extends java.lang.Object @@ -2621,12 +2615,8 @@ public final class net.corda.core.messaging.DataFeed extends java.lang.Object public int hashCode() public String toString() ## -<<<<<<< HEAD @DoNotImplement public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable -======= -@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable ->>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation public abstract void close() @NotNull public abstract net.corda.core.flows.StateMachineRunId getId() @@ -2652,12 +2642,8 @@ public final class net.corda.core.messaging.FlowHandleImpl extends java.lang.Obj public int hashCode() public String toString() ## -<<<<<<< HEAD @DoNotImplement public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle -======= -@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle ->>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation public abstract void close() @NotNull public abstract rx.Observable getProgress() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt deleted file mode 100644 index 5a5397ca6d..0000000000 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt +++ /dev/null @@ -1,55 +0,0 @@ -package net.corda.client.rpc.internal.serialization.kryo - -import com.esotericsoftware.kryo.pool.KryoPool -import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.internal.SerializationEnvironment -import net.corda.core.serialization.internal.SerializationEnvironmentImpl -import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.nodeapi.internal.serialization.* -import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme -import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer -import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -import net.corda.nodeapi.internal.serialization.kryo.RPCKryo - -private val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic, - net.corda.core.serialization.SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.RPCClient, - null) - -class KryoClientSerializationScheme : AbstractKryoSerializationScheme() { - override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return magic == kryoMagic && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P) - } - - override fun rpcClientKryoPool(context: SerializationContext): KryoPool { - return KryoPool.Builder { - DefaultKryoCustomizer.customize(RPCKryo(RpcClientObservableSerializer, context), publicKeySerializer).apply { - classLoader = context.deserializationClassLoader - } - }.build() - } - - // We're on the client and don't have access to server classes. - override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - - companion object { - /** Call from main only. */ - fun initialiseSerialization(classLoader: ClassLoader? = null) { - nodeSerializationEnv = createSerializationEnv(classLoader) - } - - fun createSerializationEnv(classLoader: ClassLoader? = null): SerializationEnvironment { - return SerializationEnvironmentImpl( - SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) - registerScheme(AMQPClientSerializationScheme(emptyList())) - }, - if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT, - rpcClientContext = if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) - } - } -} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt deleted file mode 100644 index c69081a667..0000000000 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt +++ /dev/null @@ -1,78 +0,0 @@ -package net.corda.client.rpc.internal.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import net.corda.client.rpc.internal.ObservableContext -import net.corda.core.context.Trace -import net.corda.core.serialization.SerializationContext -import net.corda.nodeapi.RPCApi -import rx.Notification -import rx.Observable -import rx.subjects.UnicastSubject -import java.time.Instant -import java.util.concurrent.atomic.AtomicInteger - -/** - * A [Serializer] to deserialize Observables once the corresponding Kryo instance has been provided with an [ObservableContext]. - */ -object RpcClientObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - fun createContext( - serializationContext: SerializationContext, - observableContext: ObservableContext - ): SerializationContext { - return serializationContext.withProperty(RpcObservableContextKey, observableContext) - } - - private fun pinInSubscriptions(observable: Observable, hardReferenceStore: MutableSet>): Observable { - val refCount = AtomicInteger(0) - return observable.doOnSubscribe { - if (refCount.getAndIncrement() == 0) { - require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" } - } - }.doOnUnsubscribe { - if (refCount.decrementAndGet() == 0) { - require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" } - } - } - } - - override fun read(kryo: Kryo, input: Input, type: Class>): Observable { - val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext - val observableId = input.readInvocationId() ?: throw IllegalStateException("Unable to read invocationId from Input.") - val observable = UnicastSubject.create>() - require(observableContext.observableMap.getIfPresent(observableId) == null) { - "Multiple Observables arrived with the same ID $observableId" - } - val rpcCallSite = getRpcCallSite(kryo, observableContext) - observableContext.observableMap.put(observableId, observable) - observableContext.callSiteMap?.put(observableId, rpcCallSite) - // We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users - // don't need to store a reference to the Observables themselves. - return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe { - // This causes Future completions to give warnings because the corresponding OnComplete sent from the server - // will arrive after the client unsubscribes from the observable and consequently invalidates the mapping. - // The unsubscribe is due to [ObservableToFuture]'s use of first(). - observableContext.observableMap.invalidate(observableId) - }.dematerialize() - } - - private fun Input.readInvocationId(): Trace.InvocationId? { - - val value = readString() ?: return null - val timestamp = readLong() - return Trace.InvocationId(value, Instant.ofEpochMilli(timestamp)) - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - throw UnsupportedOperationException("Cannot serialise Observables on the client side") - } - - private fun getRpcCallSite(kryo: Kryo, observableContext: ObservableContext): Throwable? { - val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId - return observableContext.callSiteMap?.get(rpcRequestOrObservableId) - } -} \ No newline at end of file diff --git a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt index f725babdf2..a62ec2333d 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt @@ -2,16 +2,20 @@ package net.corda.core.utilities import com.esotericsoftware.kryo.KryoException import net.corda.core.crypto.random63BitValue -import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize +import net.corda.core.serialization.* import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT +import net.corda.nodeapi.internal.serialization.SerializationContextImpl +import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.testing.core.SerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat import org.junit.Rule import org.junit.Test import org.junit.rules.ExpectedException +object EmptyWhitelist : ClassWhitelist { + override fun hasListed(type: Class<*>): Boolean = false +} + class KotlinUtilsTest { @Rule @JvmField @@ -20,6 +24,14 @@ class KotlinUtilsTest { @Rule val expectedEx: ExpectedException = ExpectedException.none() + val KRYO_CHECKPOINT_NOWHITELIST_CONTEXT = SerializationContextImpl(kryoMagic, + SerializationDefaults.javaClass.classLoader, + EmptyWhitelist, + emptyMap(), + true, + SerializationContext.UseCase.Checkpoint, + null) + @Test fun `transient property which is null`() { val test = NullTransientProperty() @@ -43,7 +55,7 @@ class KotlinUtilsTest { expectedEx.expect(KryoException::class.java) expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") val original = NonCapturingTransientProperty() - original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize() + original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT) } @Test @@ -61,8 +73,10 @@ class KotlinUtilsTest { fun `deserialise transient property with capturing lambda`() { expectedEx.expect(KryoException::class.java) expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") + val original = CapturingTransientProperty("Hello") - original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize() + + original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT) } private class NullTransientProperty { diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 331b6b5158..51e3d5bd89 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,10 @@ release, see :doc:`upgrade-notes`. Unreleased ========== + +* RPC Framework moved from Kryo to the Corda AMQP implementation [Corda-847]. This completes the removal + of ``Kryo`` from general use within Corda, remaining only for use in flow checkpointing. + * Set co.paralleluniverse.fibers.verifyInstrumentation=true in devMode. * Node will now gracefully fail to start if one of the required ports is already in use. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index 2163459104..7e660ca78e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -114,7 +114,8 @@ open class SerializationFactoryImpl( val lookupKey = magic to target return schemes.computeIfAbsent(lookupKey) { registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single? - logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes") + logger.warn("Cannot find serialization scheme for: [$lookupKey, " + + "${if (magic == amqpMagic) "AMQP" else if (magic == kryoMagic) "Kryo" else "UNKNOWN MAGIC"}] registeredSchemes are: $registeredSchemes") throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.") } to magic } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt index 8a56baf1b0..1ea7daf020 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt @@ -285,7 +285,11 @@ open class SerializerFactory( } private fun makeClassSerializer(clazz: Class<*>, type: Type, declaredType: Type): AMQPSerializer = serializersByType.computeIfAbsent(type) { - if (isPrimitive(clazz)) { + if (clazz.isSynthetic) { + // Explicitly ban synthetic classes, we have no way of recreating them when deserializing. This also + // captures Lambda expressions and other anonymous functions + throw NotSerializableException(type.typeName) + } else if (isPrimitive(clazz)) { AMQPPrimitiveSerializer(clazz) } else { findCustomSerializer(clazz, declaredType) ?: run { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt index 7a5111fd93..df8c3814ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt @@ -7,7 +7,6 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer import com.esotericsoftware.kryo.serializers.FieldSerializer import com.esotericsoftware.kryo.util.MapReferenceResolver -import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.PrivacySalt import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash @@ -18,8 +17,6 @@ import net.corda.core.serialization.SerializationContext.UseCase.Checkpoint import net.corda.core.serialization.SerializationContext.UseCase.Storage import net.corda.core.serialization.SerializeAsTokenContext import net.corda.core.serialization.SerializedBytes -import net.corda.core.toFuture -import net.corda.core.toObservable import net.corda.core.transactions.* import net.corda.core.utilities.OpaqueBytes import net.corda.nodeapi.internal.crypto.X509CertificateFactory @@ -27,7 +24,6 @@ import net.corda.nodeapi.internal.serialization.CordaClassResolver import net.corda.nodeapi.internal.serialization.serializationContextKey import org.slf4j.Logger import org.slf4j.LoggerFactory -import rx.Observable import java.io.InputStream import java.lang.reflect.InvocationTargetException import java.security.PrivateKey @@ -46,39 +42,16 @@ import kotlin.reflect.jvm.isAccessible import kotlin.reflect.jvm.javaType /** - * Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead - * simple, totally non-extensible binary (sub)format. - * - * This is NOT what should be used in any final platform product, rather, the final state should be a precisely - * specified and standardised binary format with attention paid to anti-malleability, versioning and performance. - * FIX SBE is a potential candidate: it prioritises performance over convenience and was designed for HFT. Google - * Protocol Buffers with a minor tightening to make field reordering illegal is another possibility. - * - * FIX SBE: - * https://real-logic.github.io/simple-binary-encoding/ - * http://mechanical-sympathy.blogspot.co.at/2014/05/simple-binary-encoding.html - * Protocol buffers: - * https://developers.google.com/protocol-buffers/ - * - * But for now we use Kryo to maximise prototyping speed. - * - * Note that this code ignores *ALL* concerns beyond convenience, in particular it ignores: - * - * - Performance - * - Security - * - * This code will happily deserialise literally anything, including malicious streams that would reconstruct classes - * in invalid states, thus violating system invariants. It isn't designed to handle malicious streams and therefore, - * isn't usable beyond the prototyping stage. But that's fine: we can revisit serialisation technologies later after - * a formal evaluation process. - * - * We now distinguish between internal, storage related Kryo and external, network facing Kryo. We presently use - * some non-whitelisted classes as part of internal storage. - * TODO: eliminate internal, storage related whitelist issues, such as private keys in blob storage. + * Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead + * simple, totally non-extensible binary (sub)format. Used exclusively within Corda for checkpointing flows as + * it will happily deserialise literally anything, including malicious streams that would reconstruct classes + * in invalid states and thus violating system invariants. In the context of checkpointing a Java stack, this is + * absolutely the functionality we desire, for a stable binary wire format and persistence technology, we have + * the AMQP implementation. */ /** - * A serialiser that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure + * A serializer that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure * type safety hack. */ object SerializedBytesSerializer : Serializer>() { @@ -391,44 +364,6 @@ open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapRefe } } -/** - * The Kryo used for the RPC wire protocol. - */ -// Every type in the wire protocol is listed here explicitly. -// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes, -// because we can see everything we're using in one place. -class RPCKryo(observableSerializer: Serializer>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) { - init { - DefaultKryoCustomizer.customize(this) - - // RPC specific classes - register(InputStream::class.java, InputStreamSerializer) - register(Observable::class.java, observableSerializer) - register(CordaFuture::class, - read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() }, - write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) } - ) - } - - override fun getRegistration(type: Class<*>): Registration { - if (Observable::class.java != type && Observable::class.java.isAssignableFrom(type)) { - return super.getRegistration(Observable::class.java) - } - if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) { - return super.getRegistration(InputStream::class.java) - } - if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) { - return super.getRegistration(CordaFuture::class.java) - } - type.requireExternal("RPC not allowed to deserialise internal classes") - return super.getRegistration(type) - } - - private fun Class<*>.requireExternal(msg: String) { - require(!name.startsWith("net.corda.node.") && ".internal" !in name) { "$msg: $name" } - } -} - inline fun Kryo.register( type: KClass, crossinline read: (Kryo, Input) -> T, diff --git a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java index db8da10b45..8ae47ea391 100644 --- a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java +++ b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java @@ -4,13 +4,13 @@ import com.google.common.collect.Maps; import net.corda.core.serialization.SerializationContext; import net.corda.core.serialization.SerializationFactory; import net.corda.core.serialization.SerializedBytes; -import net.corda.nodeapi.internal.serialization.kryo.CordaClosureBlacklistSerializer; -import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt; +import net.corda.nodeapi.internal.serialization.amqp.SchemaKt; import net.corda.testing.core.SerializationEnvironmentRule; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import java.io.NotSerializableException; import java.io.Serializable; import java.util.EnumSet; import java.util.concurrent.Callable; @@ -33,20 +33,17 @@ public final class ForbiddenLambdaSerializationTests { @Test public final void serialization_fails_for_serializable_java_lambdas() { contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(), + SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null); String value = "Hey"; Callable target = (Callable & Serializable) () -> value; Throwable throwable = catchThrowable(() -> serialize(target, context)); - assertThat(throwable).isNotNull(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) { - assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE); - } else { - assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes"); - } + assertThat(throwable) + .isNotNull() + .isInstanceOf(NotSerializableException.class) + .hasMessageContaining(getClass().getName()); }); } @@ -54,21 +51,17 @@ public final class ForbiddenLambdaSerializationTests { @SuppressWarnings("unchecked") public final void serialization_fails_for_not_serializable_java_lambdas() { contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(), + SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null); String value = "Hey"; Callable target = () -> value; Throwable throwable = catchThrowable(() -> serialize(target, context)); - assertThat(throwable).isNotNull(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) { - assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE); - } else { - assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes"); - } + assertThat(throwable) + .isNotNull() + .isInstanceOf(NotSerializableException.class) + .hasMessageContaining(getClass().getName()); }); } diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt index 5508e857d9..ed9345c00e 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt @@ -9,14 +9,12 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.node.serialization.amqp.AMQPServerSerializationScheme -import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.createDevKeyStores import net.corda.nodeapi.internal.serialization.AllWhitelist import net.corda.nodeapi.internal.serialization.SerializationContextImpl import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.amqp.amqpMagic -import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.TestIdentity diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt index 860c7b64c0..8607d04d8c 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt @@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.KryoSerializable import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.primitives.Ints import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever @@ -15,7 +16,6 @@ import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.sequence -import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.persistence.NodeAttachmentService import net.corda.nodeapi.internal.serialization.* import net.corda.testing.core.ALICE_NAME @@ -35,6 +35,17 @@ import java.time.Instant import java.util.* import kotlin.test.* +class TestScheme : AbstractKryoSerializationScheme() { + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { + return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient + } + + override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + + override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + +} + @RunWith(Parameterized::class) class KryoTests(private val compression: CordaSerializationEncoding?) { companion object { @@ -49,7 +60,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { @Before fun setup() { - factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } + factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) } context = SerializationContextImpl(kryoMagic, javaClass.classLoader, AllWhitelist, @@ -270,7 +281,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { } } Tmp() - val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } + val factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) } val context = SerializationContextImpl(kryoMagic, javaClass.classLoader, AllWhitelist, diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 26fa515e6d..944faf6579 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -1,7 +1,7 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.Emoji import net.corda.core.internal.concurrent.openFuture @@ -384,9 +384,9 @@ open class Node(configuration: NodeConfiguration, val classloader = cordappLoader.appClassLoader nodeSerializationEnv = SerializationEnvironmentImpl( SerializationFactoryImpl().apply { - registerScheme(KryoServerSerializationScheme()) registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps)) - registerScheme(KryoClientSerializationScheme()) + registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps)) + registerScheme(KryoServerSerializationScheme() ) }, p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader), rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader), diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt index b645b1289d..dc9ea63428 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt @@ -39,8 +39,8 @@ class AMQPServerSerializationScheme( override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { return canDeserializeVersion(magic) && - ( target == SerializationContext.UseCase.P2P - || target == SerializationContext.UseCase.Storage - || target == SerializationContext.UseCase.RPCServer) + ( target == SerializationContext.UseCase.P2P + || target == SerializationContext.UseCase.Storage + || target == SerializationContext.UseCase.RPCServer) } } diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt index acf4c7f461..83eec03995 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt @@ -4,22 +4,15 @@ import com.esotericsoftware.kryo.pool.KryoPool import net.corda.core.serialization.SerializationContext import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme -import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -import net.corda.nodeapi.internal.serialization.kryo.RPCKryo class KryoServerSerializationScheme : AbstractKryoSerializationScheme() { override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient + return magic == kryoMagic && target == SerializationContext.UseCase.Checkpoint } override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - override fun rpcServerKryoPool(context: SerializationContext): KryoPool { - return KryoPool.Builder { - DefaultKryoCustomizer.customize(RPCKryo(RpcServerObservableSerializer, context), publicKeySerializer).apply { - classLoader = context.deserializationClassLoader - } - }.build() - } + override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt deleted file mode 100644 index a51448e83a..0000000000 --- a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt +++ /dev/null @@ -1,87 +0,0 @@ -package net.corda.node.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import net.corda.core.context.Trace -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationDefaults -import net.corda.node.services.messaging.ObservableSubscription -import net.corda.node.services.messaging.RPCServer -import net.corda.nodeapi.RPCApi -import org.slf4j.LoggerFactory -import rx.Notification -import rx.Observable -import rx.Subscriber - -object RpcServerObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - private val log = LoggerFactory.getLogger(javaClass) - - fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { - return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) - } - - override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable { - throw UnsupportedOperationException() - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - val observableId = Trace.InvocationId.newInstance() - val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext - output.writeInvocationId(observableId) - val observableWithSubscription = ObservableSubscription( - // We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing - // must be done again within the subscriber - subscription = observable.materialize().subscribe( - object : Subscriber>() { - override fun onNext(observation: Notification<*>) { - if (!isUnsubscribed) { - val message = RPCApi.ServerToClient.Observation( - id = observableId, - content = observation, - deduplicationIdentity = observableContext.deduplicationIdentity - ) - observableContext.sendMessage(message) - } - } - - override fun onError(exception: Throwable) { - log.error("onError called in materialize()d RPC Observable", exception) - } - - override fun onCompleted() { - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables != null) { - observables.remove(observableId) - if (observables.isEmpty()) { - null - } else { - observables - } - } else { - null - } - } - } - } - ) - ) - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables == null) { - hashSetOf(observableId) - } else { - observables.add(observableId) - observables - } - } - observableContext.observableMap.put(observableId, observableWithSubscription) - } - - private fun Output.writeInvocationId(id: Trace.InvocationId) { - writeString(id.value) - writeLong(id.timestamp.toEpochMilli()) - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt index ce0521d686..32adde2e64 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt @@ -6,6 +6,11 @@ import net.corda.nodeapi.RPCApi import org.apache.activemq.artemis.api.core.SimpleString import java.util.concurrent.ConcurrentHashMap +/** + * An observable context is constructed on each RPC request. If subsequently a nested Observable is encountered this + * same context is propagated by the serialization context. This way all observations rooted in a single RPC will be + * muxed correctly. Note that the context construction itself is quite cheap. + */ interface ObservableContextInterface { fun sendMessage(serverToClient: RPCApi.ServerToClient) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 7931bd158b..4574afffef 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -409,7 +409,7 @@ class RPCServer( /* * We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this - * same context is propagated by the instrumented KryoPool. This way all observations rooted in a single RPC will be + * same context is propagated by serialization context. This way all observations rooted in a single RPC will be * muxed correctly. Note that the context construction itself is quite cheap. */ inner class ObservableContext( diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt index e661007909..4467a271d6 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt @@ -60,7 +60,8 @@ class RoundTripObservableSerializerTests { @Test fun roundTripTest1() { - val serializationScheme = AMQPRoundTripRPCSerializationScheme(serializationContext) + val serializationScheme = AMQPRoundTripRPCSerializationScheme( + serializationContext, emptySet(), ConcurrentHashMap()) // Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap, // the client as a property of the deserializer which, in the actual RPC client, is pulled off of diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt index 17fa31d95c..6778a2ef91 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt @@ -35,7 +35,7 @@ class RpcServerObservableSerializerTests { @Test fun canSerializerBeRegistered() { - val sf = SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist) + val sf = SerializerFactory(AllWhitelist, javaClass.classLoader) try { sf.register(RpcServerObservableSerializer()) @@ -68,10 +68,7 @@ class RpcServerObservableSerializerTests { deduplicationIdentity = "thisIsATest", clientAddress = SimpleString(testClientAddress)) - val sf = SerializerFactory( - cl = javaClass.classLoader, - whitelist = AllWhitelist - ).apply { + val sf = SerializerFactory(AllWhitelist, javaClass.classLoader).apply { register(RpcServerObservableSerializer()) } diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt index 70ef79cc8b..73a48147ba 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt @@ -3,6 +3,7 @@ package net.corda.node.internal.serialization.testutils import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.core.context.Trace import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationCustomSerializer import net.corda.node.serialization.amqp.RpcServerObservableSerializer @@ -11,30 +12,30 @@ import net.corda.nodeapi.internal.serialization.AllWhitelist import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import java.util.concurrent.ConcurrentHashMap import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext /** * Special serialization context for the round trip tests that allows for both server and client RPC * operations */ + + class AMQPRoundTripRPCSerializationScheme( private val serializationContext: SerializationContext, - cordappCustomSerializers: Set> = emptySet()) + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory>) : AbstractAMQPSerializationScheme( - cordappCustomSerializers + cordappCustomSerializers, serializerFactoriesForContexts ) { - constructor( - serializationContext: SerializationContext, - cordapps: List) : this(serializationContext, cordapps.customSerializers) - override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { - return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply { + return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { register(RpcClientObservableSerializer) } } override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { - return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply { + return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { register(RpcServerObservableSerializer()) } } diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt deleted file mode 100644 index a1c20aae53..0000000000 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt +++ /dev/null @@ -1,11 +0,0 @@ -package net.corda.node.internal.serialization.testutils - -import net.corda.core.serialization.SerializationContext -import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory -import net.corda.nodeapi.internal.serialization.amqp.SerializerFactoryFactory - -class TestSerializerFactoryFactory : SerializerFactoryFactory() { - override fun make(context: SerializationContext): SerializerFactory { - return super.make(context) - } -} \ No newline at end of file diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt index eaebc3b134..abc75b4eba 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt @@ -4,10 +4,9 @@ import com.nhaarman.mockito_kotlin.doNothing import com.nhaarman.mockito_kotlin.whenever import net.corda.core.DoNotImplement import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.serialization.internal.* -import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.serialization.amqp.AMQPServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.serialization.* import net.corda.testing.core.SerializationEnvironmentRule import java.util.concurrent.ConcurrentHashMap @@ -31,10 +30,10 @@ fun withoutTestSerialization(callable: () -> T): T { // TODO: Delete this, s internal fun createTestSerializationEnv(label: String): SerializationEnvironmentImpl { val factory = SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) - registerScheme(KryoServerSerializationScheme()) registerScheme(AMQPClientSerializationScheme(emptyList())) registerScheme(AMQPServerSerializationScheme(emptyList())) + // needed for checkpointing + registerScheme(KryoServerSerializationScheme()) } return object : SerializationEnvironmentImpl( factory, diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt index 2a76aa1fb3..4b909ed4f6 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt @@ -2,7 +2,6 @@ package net.corda.demobench import javafx.scene.image.Image import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.demobench.views.DemoBenchView @@ -59,7 +58,6 @@ class DemoBench : App(DemoBenchView::class) { private fun initialiseSerialization() { nodeSerializationEnv = SerializationEnvironmentImpl( SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) registerScheme(AMQPClientSerializationScheme(emptyList())) }, AMQP_P2P_CONTEXT) diff --git a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt index 919acd78cf..9aceb2cff2 100644 --- a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt +++ b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt @@ -22,6 +22,7 @@ import org.assertj.core.api.Assertions.assertThat import org.bouncycastle.util.io.Streams import org.junit.Ignore import org.junit.Test +import java.lang.Thread.sleep import java.net.ConnectException import kotlin.test.assertTrue import kotlin.test.fail @@ -55,7 +56,7 @@ class SSHServerTest { // The driver will automatically pick up the annotated flows below driver { val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), - customOverrides = mapOf("sshd" to mapOf("port" to 2222))) + customOverrides = mapOf("sshd" to mapOf("port" to 2222)) /*, startInSameProcess = true */) node.getOrThrow() val session = JSch().getSession("u", "localhost", 2222)