ENT-1463: Prepare node-api for determination. (#3080)

* Prepare node-api for determination.
* Disentangle Kryo and AMQP classes.
* Add version properties for fast-classpath-scanner, proton-j and snappy.
* Remove String.jvm extension function.
* Refactor Cordapp reference out of AMQP serialisers' primary constructors.
This commit is contained in:
Chris Rankin 2018-05-09 13:37:04 +01:00 committed by GitHub
parent be11da76c8
commit 781b50642a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 149 additions and 92 deletions

View File

@ -21,7 +21,7 @@ buildscript {
// TODO: Upgrade gradle-capsule-plugin to a version with capsule:1.0.3
ext.capsule_version = '1.0.1'
ext.asm_version = '0.5.3'
ext.asm_version = '5.0.4'
/*
* TODO Upgrade to version 2.4 for large message streaming support
@ -77,6 +77,9 @@ buildscript {
ext.eaagentloader_version = '1.0.3'
ext.jsch_version = '0.1.54'
ext.commons_cli_version = '1.4'
ext.protonj_version = '0.27.1'
ext.snappy_version = '0.4'
ext.fast_classpath_scanner_version = '2.12.3'
// Update 121 is required for ObjectInputFilter and at time of writing 131 was latest:
ext.java8_minUpdateVersion = '131'

View File

@ -33,15 +33,16 @@ dependencies {
// Kryo: object graph serialization.
compile "com.esotericsoftware:kryo:4.0.0"
compile "de.javakaffee:kryo-serializers:0.41"
compile "org.ow2.asm:asm:$asm_version"
// For AMQP serialisation.
compile "org.apache.qpid:proton-j:0.27.1"
compile "org.apache.qpid:proton-j:$protonj_version"
// FastClasspathScanner: classpath scanning - needed for the NetworkBootstraper
compile 'io.github.lukehutch:fast-classpath-scanner:2.12.3'
// FastClasspathScanner: classpath scanning - needed for the NetworkBootstrapper and AMQP.
compile "io.github.lukehutch:fast-classpath-scanner:$fast_classpath_scanner_version"
// Pure-Java Snappy compression
compile 'org.iq80.snappy:snappy:0.4'
compile "org.iq80.snappy:snappy:$snappy_version"
// For caches rather than guava
compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version"

View File

@ -0,0 +1,63 @@
@file:JvmName("ByteBufferStreams")
package net.corda.nodeapi.internal.serialization
import net.corda.core.internal.LazyPool
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.nio.ByteBuffer
import kotlin.math.min
internal val serializeOutputStreamPool = LazyPool(
clear = ByteBufferOutputStream::reset,
shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large
newInstance = { ByteBufferOutputStream(64 * 1024) })
internal fun <T> byteArrayOutput(task: (ByteBufferOutputStream) -> T): ByteArray {
return serializeOutputStreamPool.run { underlying ->
task(underlying)
underlying.toByteArray() // Must happen after close, to allow ZIP footer to be written for example.
}
}
class ByteBufferInputStream(val byteBuffer: ByteBuffer) : InputStream() {
@Throws(IOException::class)
override fun read(): Int {
return if (byteBuffer.hasRemaining()) byteBuffer.get().toInt() else -1
}
@Throws(IOException::class)
override fun read(b: ByteArray, offset: Int, length: Int): Int {
if (offset < 0 || length < 0 || length > b.size - offset) {
throw IndexOutOfBoundsException()
} else if (length == 0) {
return 0
} else if (!byteBuffer.hasRemaining()) {
return -1
}
val size = min(length, byteBuffer.remaining())
byteBuffer.get(b, offset, size)
return size
}
}
class ByteBufferOutputStream(size: Int) : ByteArrayOutputStream(size) {
companion object {
private val ensureCapacity = ByteArrayOutputStream::class.java.getDeclaredMethod("ensureCapacity", Int::class.java).apply {
isAccessible = true
}
}
fun <T> alsoAsByteBuffer(remaining: Int, task: (ByteBuffer) -> T): T {
ensureCapacity.invoke(this, count + remaining)
val buffer = ByteBuffer.wrap(buf, count, remaining)
val result = task(buffer)
count = buffer.position()
return result
}
fun copyTo(stream: OutputStream) {
stream.write(buf, 0, count)
}
}

View File

@ -16,7 +16,7 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutionException
val attachmentsClassLoaderEnabledPropertyName = "attachments.class.loader.enabled"
const val attachmentsClassLoaderEnabledPropertyName = "attachments.class.loader.enabled"
internal object NullEncodingWhitelist : EncodingWhitelist {
override fun acceptEncoding(encoding: SerializationEncoding) = false
@ -30,7 +30,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
override val useCase: SerializationContext.UseCase,
override val encoding: SerializationEncoding?,
override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist) : SerializationContext {
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).build()
private val builder = AttachmentsClassLoaderBuilder(properties, deserializationClassLoader)
/**
* {@inheritDoc}
@ -39,23 +39,8 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
*/
override fun withAttachmentsClassLoader(attachmentHashes: List<SecureHash>): SerializationContext {
properties[attachmentsClassLoaderEnabledPropertyName] as? Boolean == true || return this
val serializationContext = properties[serializationContextKey] as? SerializeAsTokenContextImpl
?: return this // Some tests don't set one.
try {
return withClassLoader(cache.get(attachmentHashes) {
val missing = ArrayList<SecureHash>()
val attachments = ArrayList<Attachment>()
attachmentHashes.forEach { id ->
serializationContext.serviceHub.attachments.openAttachment(id)?.let { attachments += it }
?: run { missing += id }
}
missing.isNotEmpty() && throw MissingAttachmentsException(missing)
AttachmentsClassLoader(attachments, parent = deserializationClassLoader)
}!!)
} catch (e: ExecutionException) {
// Caught from within the cache get, so unwrap.
throw e.cause!!
}
val classLoader = builder.build(attachmentHashes) ?: return this
return withClassLoader(classLoader)
}
override fun withProperty(property: Any, value: Any): SerializationContext {
@ -80,6 +65,33 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
override fun withEncoding(encoding: SerializationEncoding?) = copy(encoding = encoding)
}
/*
* This class is internal rather than private so that node-api-deterministic
* can replace it with an alternative version.
*/
internal class AttachmentsClassLoaderBuilder(private val properties: Map<Any, Any>, private val deserializationClassLoader: ClassLoader) {
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).build()
fun build(attachmentHashes: List<SecureHash>): AttachmentsClassLoader? {
val serializationContext = properties[serializationContextKey] as? SerializeAsTokenContext ?: return null // Some tests don't set one.
try {
return cache.get(attachmentHashes) {
val missing = ArrayList<SecureHash>()
val attachments = ArrayList<Attachment>()
attachmentHashes.forEach { id ->
serializationContext.serviceHub.attachments.openAttachment(id)?.let { attachments += it }
?: run { missing += id }
}
missing.isNotEmpty() && throw MissingAttachmentsException(missing)
AttachmentsClassLoader(attachments, parent = deserializationClassLoader)
}!!
} catch (e: ExecutionException) {
// Caught from within the cache get, so unwrap.
throw e.cause!!
}
}
}
open class SerializationFactoryImpl : SerializationFactory() {
companion object {
val magicSize = sequenceOf(kryoMagic, amqpMagic).map { it.size }.distinct().single()
@ -152,4 +164,4 @@ interface SerializationScheme {
@Throws(NotSerializableException::class)
fun <T : Any> serialize(obj: T, context: SerializationContext): SerializedBytes<T>
}
}

View File

@ -35,9 +35,11 @@ open class SerializerFactoryFactory {
}
abstract class AbstractAMQPSerializationScheme(
val cordappLoader: List<Cordapp>,
private val cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
val sff: SerializerFactoryFactory = SerializerFactoryFactory()
) : SerializationScheme {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers)
// TODO: This method of initialisation for the Whitelist and plugin serializers will have to change
// when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way
companion object {
@ -62,6 +64,8 @@ abstract class AbstractAMQPSerializationScheme(
.map { it.kotlin.objectOrNewInstance() }
}
}
val List<Cordapp>.customSerializers get() = flatMap { it.serializationCustomSerializers }.toSet()
}
private fun registerCustomSerializers(context: SerializationContext, factory: SerializerFactory) {
@ -103,15 +107,13 @@ abstract class AbstractAMQPSerializationScheme(
// If we're passed in an external list we trust that, otherwise revert to looking at the scan of the
// classpath to find custom serializers.
if (cordappLoader.isEmpty()) {
if (cordappCustomSerializers.isEmpty()) {
for (customSerializer in customSerializers) {
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
} else {
cordappLoader.forEach { loader ->
for (customSerializer in loader.serializationCustomSerializers) {
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
cordappCustomSerializers.forEach { customSerializer ->
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
}
@ -154,13 +156,16 @@ abstract class AbstractAMQPSerializationScheme(
}
// TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented
class AMQPServerSerializationScheme(cordapps: List<Cordapp> = emptyList()) : AbstractAMQPSerializationScheme(cordapps) {
class AMQPServerSerializationScheme(cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet())
: AbstractAMQPSerializationScheme(cordappCustomSerializers) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers)
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
throw UnsupportedOperationException()
}
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
@ -171,9 +176,12 @@ class AMQPServerSerializationScheme(cordapps: List<Cordapp> = emptyList()) : Abs
}
// TODO: This will eventually cover client RPC as well and move to client module, but for now this is not implemented
class AMQPClientSerializationScheme(cordapps: List<Cordapp> = emptyList()) : AbstractAMQPSerializationScheme(cordapps) {
class AMQPClientSerializationScheme(cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>> = emptySet())
: AbstractAMQPSerializationScheme(cordappCustomSerializers) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers)
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
throw UnsupportedOperationException()
}
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {

View File

@ -1,8 +1,9 @@
@file:JvmName("AMQPStreams")
package net.corda.nodeapi.internal.serialization.amqp
import com.esotericsoftware.kryo.io.ByteBufferInputStream
import net.corda.nodeapi.internal.serialization.kryo.ByteBufferOutputStream
import net.corda.nodeapi.internal.serialization.kryo.serializeOutputStreamPool
import net.corda.nodeapi.internal.serialization.ByteBufferInputStream
import net.corda.nodeapi.internal.serialization.ByteBufferOutputStream
import net.corda.nodeapi.internal.serialization.serializeOutputStreamPool
import java.io.InputStream
import java.io.OutputStream
import java.nio.ByteBuffer

View File

@ -37,7 +37,7 @@ class DeserializationInput @JvmOverloads constructor(private val serializerFacto
private val objectHistory: MutableList<Any> = mutableListOf()
companion object {
private val BYTES_NEEDED_TO_PEEK: Int = 23
private const val BYTES_NEEDED_TO_PEEK: Int = 23
fun peekSize(bytes: ByteArray): Int {
// There's an 8 byte header, and then a 0 byte plus descriptor followed by constructor

View File

@ -5,7 +5,7 @@ import net.corda.core.serialization.SerializationEncoding
import net.corda.core.serialization.SerializedBytes
import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding
import net.corda.nodeapi.internal.serialization.SectionId
import net.corda.nodeapi.internal.serialization.kryo.byteArrayOutput
import net.corda.nodeapi.internal.serialization.byteArrayOutput
import org.apache.qpid.proton.codec.Data
import java.io.NotSerializableException
import java.io.OutputStream

View File

@ -2,7 +2,6 @@ package net.corda.nodeapi.internal.serialization.amqp
import com.google.common.primitives.Primitives
import com.google.common.reflect.TypeResolver
import net.corda.core.internal.getStackTraceAsString
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.utilities.loggerFor
@ -247,7 +246,7 @@ open class SerializerFactory(
// preserve the actual message locally
loggerFor<SerializerFactory>().apply {
error("${e.message} [hint: enable trace debugging for the stack trace]")
trace(e.getStackTraceAsString())
trace("", e)
}
// prevent carpenter exceptions escaping into the world, convert things into a nice

View File

@ -1,5 +1,6 @@
package net.corda.nodeapi.internal.serialization.carpenter
import com.google.common.base.MoreObjects
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.CordaSerializable
import org.objectweb.asm.ClassWriter
@ -23,7 +24,7 @@ interface SimpleFieldAccess {
class CarpenterClassLoader(parentClassLoader: ClassLoader = Thread.currentThread().contextClassLoader) :
ClassLoader(parentClassLoader) {
fun load(name: String, bytes: ByteArray) = defineClass(name, bytes, 0, bytes.size)
fun load(name: String, bytes: ByteArray): Class<*> = defineClass(name, bytes, 0, bytes.size)
}
class InterfaceMismatchNonGetterException(val clazz: Class<*>, val method: Method) : InterfaceMismatchException(
@ -37,10 +38,12 @@ class InterfaceMismatchMissingAMQPFieldException(val clazz: Class<*>, val field:
*/
private const val TARGET_VERSION = V1_8
private val jlEnum get() = Type.getInternalName(Enum::class.java)
private val jlString get() = Type.getInternalName(String::class.java)
private val jlObject get() = Type.getInternalName(Object::class.java)
private val jlClass get() = Type.getInternalName(Class::class.java)
private val jlEnum: String = Type.getInternalName(Enum::class.java)
private val jlString: String = Type.getInternalName(String::class.java)
private val jlObject: String = Type.getInternalName(Object::class.java)
private val jlClass: String = Type.getInternalName(Class::class.java)
private val moreObjects: String = Type.getInternalName(MoreObjects::class.java)
private val toStringHelper: String = Type.getInternalName(MoreObjects.ToStringHelper::class.java)
/**
* A class carpenter generates JVM bytecodes for a class given a schema and then loads it into a sub-classloader.
@ -97,7 +100,6 @@ class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader
val classloader = CarpenterClassLoader(cl)
private val _loaded = HashMap<String, Class<*>>()
private val String.jvm: String get() = replace(".", "/")
/** Returns a snapshot of the currently loaded classes as a map of full class name (package names+dots) -> class object */
val loaded: Map<String, Class<*>> = HashMap(_loaded)
@ -155,7 +157,7 @@ class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader
private fun generateInterface(interfaceSchema: Schema): Class<*> {
return generate(interfaceSchema) { cw, schema ->
val interfaces = schema.interfaces.map { it.name.jvm }.toTypedArray()
val interfaces = schema.interfaces.map { Type.getInternalName(it) }.toTypedArray()
cw.apply {
visit(TARGET_VERSION, ACC_PUBLIC + ACC_ABSTRACT + ACC_INTERFACE, schema.jvmName, null,
@ -172,12 +174,12 @@ class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader
private fun generateClass(classSchema: Schema): Class<*> {
return generate(classSchema) { cw, schema ->
val superName = schema.superclass?.jvmName ?: jlObject
val interfaces = schema.interfaces.map { it.name.jvm }.toMutableList()
val interfaces = schema.interfaces.map { Type.getInternalName(it) }.toMutableList()
if (SimpleFieldAccess::class.java !in schema.interfaces
&& schema.flags.cordaSerializable()
&& schema.flags.simpleFieldAccess()) {
interfaces.add(SimpleFieldAccess::class.java.name.jvm)
interfaces.add(Type.getInternalName(SimpleFieldAccess::class.java))
}
cw.apply {
@ -214,12 +216,11 @@ class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader
}
private fun ClassWriter.generateToString(schema: Schema) {
val toStringHelper = "com/google/common/base/MoreObjects\$ToStringHelper"
with(visitMethod(ACC_PUBLIC, "toString", "()L$jlString;", null, null)) {
visitCode()
// com.google.common.base.MoreObjects.toStringHelper("TypeName")
visitLdcInsn(schema.name.split('.').last())
visitMethodInsn(INVOKESTATIC, "com/google/common/base/MoreObjects", "toStringHelper",
visitMethodInsn(INVOKESTATIC, moreObjects, "toStringHelper",
"(L$jlString;)L$toStringHelper;", false)
// Call the add() methods.
for ((name, field) in schema.fieldsIncludingSuperclasses().entries) {
@ -237,7 +238,7 @@ class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader
}
private fun ClassWriter.generateGetMethod() {
val ourJvmName = ClassCarpenter::class.java.name.jvm
val ourJvmName = Type.getInternalName(ClassCarpenter::class.java)
with(visitMethod(ACC_PUBLIC, "get", "(L$jlString;)L$jlObject;", null, null)) {
visitCode()
visitVarInsn(ALOAD, 0) // Load 'this'
@ -372,7 +373,7 @@ class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader
var slot = 1
superclassFields.values.forEach { slot += load(slot, it) }
val superDesc = sc.descriptorsIncludingSuperclasses().values.joinToString("")
visitMethodInsn(INVOKESPECIAL, sc.name.jvm, "<init>", "($superDesc)V", false)
visitMethodInsn(INVOKESPECIAL, sc.jvmName, "<init>", "($superDesc)V", false)
}
// Assign the fields from parameters.

View File

@ -5,7 +5,6 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.ByteBufferInputStream
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
@ -39,8 +38,8 @@ abstract class AbstractKryoSerializationScheme : SerializationScheme {
protected abstract fun rpcClientKryoPool(context: SerializationContext): KryoPool
protected abstract fun rpcServerKryoPool(context: SerializationContext): KryoPool
// this can be overriden in derived serialization schemes
open protected val publicKeySerializer: Serializer<PublicKey> = PublicKeySerializer
// this can be overridden in derived serialization schemes
protected open val publicKeySerializer: Serializer<PublicKey> = PublicKeySerializer
private fun getPool(context: SerializationContext): KryoPool {
return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {

View File

@ -1,40 +1,16 @@
@file:JvmName("KryoStreams")
package net.corda.nodeapi.internal.serialization.kryo
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import net.corda.core.internal.LazyPool
import java.io.ByteArrayOutputStream
import net.corda.nodeapi.internal.serialization.byteArrayOutput
import java.io.InputStream
import java.io.OutputStream
import java.io.SequenceInputStream
import java.nio.ByteBuffer
class ByteBufferOutputStream(size: Int) : ByteArrayOutputStream(size) {
companion object {
private val ensureCapacity = ByteArrayOutputStream::class.java.getDeclaredMethod("ensureCapacity", Int::class.java).apply {
isAccessible = true
}
}
fun <T> alsoAsByteBuffer(remaining: Int, task: (ByteBuffer) -> T): T {
ensureCapacity.invoke(this, count + remaining)
val buffer = ByteBuffer.wrap(buf, count, remaining)
val result = task(buffer)
count = buffer.position()
return result
}
fun copyTo(stream: OutputStream) {
stream.write(buf, 0, count)
}
}
private val serializationBufferPool = LazyPool(
newInstance = { ByteArray(64 * 1024) })
internal val serializeOutputStreamPool = LazyPool(
clear = ByteBufferOutputStream::reset,
shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large
newInstance = { ByteBufferOutputStream(64 * 1024) })
internal fun <T> kryoInput(underlying: InputStream, task: Input.() -> T): T {
return serializationBufferPool.run {
@ -56,13 +32,6 @@ internal fun <T> kryoOutput(task: Output.() -> T): ByteArray {
}
}
internal fun <T> byteArrayOutput(task: (ByteBufferOutputStream) -> T): ByteArray {
return serializeOutputStreamPool.run { underlying ->
task(underlying)
underlying.toByteArray() // Must happen after close, to allow ZIP footer to be written for example.
}
}
internal fun Output.substitute(transform: (OutputStream) -> OutputStream) {
flush()
outputStream = transform(outputStream)

View File

@ -43,7 +43,7 @@ class TestSerializerFactoryFactory : SerializerFactoryFactory() {
}
}
class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptyList(), TestSerializerFactoryFactory()) {
class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptySet(), TestSerializerFactoryFactory()) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}

View File

@ -1,6 +1,7 @@
package net.corda.nodeapi.internal.serialization.kryo
import net.corda.core.internal.declaredField
import net.corda.nodeapi.internal.serialization.ByteBufferOutputStream
import org.assertj.core.api.Assertions.catchThrowable
import org.junit.Assert.assertArrayEquals
import org.junit.Test