[CORDA-481]: GH 965: Java 8 lambdas don't work properly in checkpointing (#1619)

This commit is contained in:
Michele Sollecito 2017-09-26 13:22:59 +01:00 committed by GitHub
parent 246ab26d30
commit 63168c0299
9 changed files with 195 additions and 5 deletions

View File

@ -10,6 +10,8 @@ UNRELEASED
Release 1.0
-----------
* Java 8 lambdas now work property with Kryo during check-pointing.
* String constants have been marked as ``const`` type in Kotlin, eliminating cases where functions of the form
``get<constant name>()`` were created for the Java API. These can now be referenced by their name directly.

View File

@ -38,6 +38,10 @@ It's reproduced here as an example of both ways you can do this for a couple of
.. note:: Several of the core interfaces at the heart of Corda are already annotated and so any classes that implement
them will automatically be whitelisted. This includes `Contract`, `ContractState` and `CommandData`.
.. warning:: Java 8 Lambda expressions are not serializable except in flow checkpoints, and then not by default. The syntax to declare a serializable Lambda
expression that will work with Corda is ``Runnable r = (Runnable & Serializable) () -> System.out.println("Hello World");``, or
``Callable<String> c = (Callable<String> & Serializable) () -> "Hello World";``.
.. warning:: We will be replacing the use of Kryo in the serialization framework and so additional changes here are
likely.

View File

@ -176,7 +176,7 @@ object TwoPartyDealFlow {
// What is the seller trying to sell us?
val autoOffer = handshake.payload
val deal = autoOffer.dealBeingOffered
logger.trace { "Got deal request for: ${deal.linearId.externalId!!}" }
logger.trace { "Got deal request for: ${deal.linearId.externalId}" }
return handshake.copy(payload = autoOffer.copy(dealBeingOffered = deal))
}

View File

@ -0,0 +1,29 @@
package net.corda.nodeapi.internal.serialization
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.ClosureSerializer
import java.io.Serializable
object CordaClosureSerializer : ClosureSerializer() {
val ERROR_MESSAGE = "Unable to serialize Java Lambda expression, unless explicitly declared e.g., Runnable r = (Runnable & Serializable) () -> System.out.println(\"Hello world!\");"
override fun write(kryo: Kryo, output: Output, target: Any) {
if (!isSerializable(target)) {
throw IllegalArgumentException(ERROR_MESSAGE)
}
super.write(kryo, output, target)
}
private fun isSerializable(target: Any): Boolean {
return target is Serializable
}
}
object CordaClosureBlacklistSerializer : ClosureSerializer() {
val ERROR_MESSAGE = "Java 8 Lambda expressions are not supported for serialization."
override fun write(kryo: Kryo, output: Output, target: Any) {
throw IllegalArgumentException(ERROR_MESSAGE)
}
}

View File

@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.ClosureSerializer
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
import com.esotericsoftware.kryo.serializers.FieldSerializer
import de.javakaffee.kryoserializers.ArraysAsListSerializer
@ -118,6 +119,9 @@ object DefaultKryoCustomizer {
// Used by the remote verifier, and will possibly be removed in future.
register(ContractAttachment::class.java, ContractAttachmentSerializer)
register(java.lang.invoke.SerializedLambda::class.java)
register(ClosureSerializer.Closure::class.java, CordaClosureBlacklistSerializer)
val customization = KryoSerializationCustomization(this)
pluginRegistries.forEach { it.customizeSerialization(customization) }
}

View File

@ -8,6 +8,7 @@ import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.esotericsoftware.kryo.serializers.ClosureSerializer
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import net.corda.core.contracts.Attachment
@ -167,6 +168,7 @@ abstract class AbstractKryoSerializationScheme : SerializationScheme {
field.set(this, classResolver)
DefaultKryoCustomizer.customize(this)
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
classLoader = it.second
}
}.build()

View File

@ -0,0 +1,77 @@
package net.corda.nodeapi.internal.serialization;
import com.google.common.collect.Maps;
import net.corda.core.serialization.SerializationContext;
import net.corda.core.serialization.SerializationDefaults;
import net.corda.core.serialization.SerializationFactory;
import net.corda.core.serialization.SerializedBytes;
import net.corda.testing.TestDependencyInjectionBase;
import org.junit.Before;
import org.junit.Test;
import java.io.Serializable;
import java.util.EnumSet;
import java.util.concurrent.Callable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.ThrowableAssert.catchThrowable;
public final class ForbiddenLambdaSerializationTests extends TestDependencyInjectionBase {
private SerializationFactory factory;
@Before
public void setup() {
factory = SerializationDefaults.INSTANCE.getSERIALIZATION_FACTORY();
}
@Test
public final void serialization_fails_for_serializable_java_lambdas() throws Exception {
EnumSet<SerializationContext.UseCase> contexts = EnumSet.complementOf(EnumSet.of(SerializationContext.UseCase.Checkpoint));
contexts.forEach(ctx -> {
SerializationContext context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx);
String value = "Hey";
Callable<String> target = (Callable<String> & Serializable) () -> value;
Throwable throwable = catchThrowable(() -> serialize(target, context));
assertThat(throwable).isNotNull();
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) {
assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.INSTANCE.getERROR_MESSAGE());
} else {
assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes");
}
});
}
@Test
@SuppressWarnings("unchecked")
public final void serialization_fails_for_not_serializable_java_lambdas() throws Exception {
EnumSet<SerializationContext.UseCase> contexts = EnumSet.complementOf(EnumSet.of(SerializationContext.UseCase.Checkpoint));
contexts.forEach(ctx -> {
SerializationContext context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx);
String value = "Hey";
Callable<String> target = () -> value;
Throwable throwable = catchThrowable(() -> serialize(target, context));
assertThat(throwable).isNotNull();
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) {
assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.INSTANCE.getERROR_MESSAGE());
} else {
assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes");
}
});
}
private <T> SerializedBytes<T> serialize(final T target, final SerializationContext context) {
return factory.serialize(target, context);
}
}

View File

@ -0,0 +1,61 @@
package net.corda.nodeapi.internal.serialization;
import com.google.common.collect.Maps;
import net.corda.core.serialization.SerializationContext;
import net.corda.core.serialization.SerializationDefaults;
import net.corda.core.serialization.SerializationFactory;
import net.corda.core.serialization.SerializedBytes;
import net.corda.testing.TestDependencyInjectionBase;
import org.junit.Before;
import org.junit.Test;
import java.io.Serializable;
import java.util.concurrent.Callable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.ThrowableAssert.catchThrowable;
public final class LambdaCheckpointSerializationTest extends TestDependencyInjectionBase {
private SerializationFactory factory;
private SerializationContext context;
@Before
public void setup() {
factory = SerializationDefaults.INSTANCE.getSERIALIZATION_FACTORY();
context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, SerializationContext.UseCase.Checkpoint);
}
@Test
@SuppressWarnings("unchecked")
public final void serialization_works_for_serializable_java_lambdas() throws Exception {
String value = "Hey";
Callable<String> target = (Callable<String> & Serializable) () -> value;
SerializedBytes<Callable<String>> serialized = serialize(target);
Callable<String> deserialized = deserialize(serialized, Callable.class);
assertThat(deserialized.call()).isEqualTo(value);
}
@Test
@SuppressWarnings("unchecked")
public final void serialization_fails_for_not_serializable_java_lambdas() throws Exception {
String value = "Hey";
Callable<String> target = () -> value;
Throwable throwable = catchThrowable(() -> serialize(target));
assertThat(throwable).isNotNull();
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
assertThat(throwable).hasMessage(CordaClosureSerializer.INSTANCE.getERROR_MESSAGE());
}
private <T> SerializedBytes<T> serialize(final T target) {
return factory.serialize(target, context);
}
private <T> T deserialize(final SerializedBytes<? extends T> bytes, final Class<T> type) {
return factory.deserialize(bytes, type, context);
}
}

View File

@ -10,8 +10,13 @@ import com.opengamma.strata.pricer.rate.ImmutableRatesProvider
import com.opengamma.strata.pricer.swap.DiscountingSwapProductPricer
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.flows.*
import net.corda.core.flows.AbstractStateReplacementFlow.Proposal
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateReplacementException
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.services.queryBy
@ -21,7 +26,13 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.unwrap
import net.corda.finance.flows.TwoPartyDealFlow
import net.corda.vega.analytics.*
import net.corda.vega.analytics.BimmAnalysisUtils
import net.corda.vega.analytics.InitialMarginTriple
import net.corda.vega.analytics.IsdaConfiguration
import net.corda.vega.analytics.OGSIMMAnalyticsEngine
import net.corda.vega.analytics.PortfolioNormalizer
import net.corda.vega.analytics.RwamBimmNotProductClassesCalculator
import net.corda.vega.analytics.compareIMTriples
import net.corda.vega.contracts.IRSState
import net.corda.vega.contracts.PortfolioState
import net.corda.vega.contracts.PortfolioValuation
@ -30,6 +41,8 @@ import net.corda.vega.portfolio.Portfolio
import net.corda.vega.portfolio.toPortfolio
import java.time.LocalDate
private val calibrator = CurveCalibrator.of(1e-9, 1e-9, 100, CalibrationMeasures.PAR_SPREAD)
/**
* The Simm Flow is between two parties that both agree on a portfolio of trades to run valuations on. Both sides
* will independently value the portfolio using a SIMM implementation and then come to consensus over those valuations.
@ -134,7 +147,6 @@ object SimmFlow {
val pricer = DiscountingSwapProductPricer.DEFAULT
val OGTrades = portfolio.swaps.map { it -> it.toFixedLeg().resolve(referenceData) }
val calibrator = CurveCalibrator.of(1e-9, 1e-9, 100, CalibrationMeasures.PAR_SPREAD)
val ratesProvider = calibrator.calibrate(curveGroup, marketData, ReferenceData.standard())
val fxRateProvider = MarketDataFxRateProvider.of(marketData)
@ -252,7 +264,6 @@ object SimmFlow {
val pricer = DiscountingSwapProductPricer.DEFAULT
val OGTrades = portfolio.swaps.map { it -> it.toFixedLeg().resolve(referenceData) }
val calibrator = CurveCalibrator.of(1e-9, 1e-9, 100, CalibrationMeasures.PAR_SPREAD)
val ratesProvider = calibrator.calibrate(curveGroup, marketData, ReferenceData.standard())
val fxRateProvider = MarketDataFxRateProvider.of(marketData)