Moved serialisation code in client-rpc into internal packages (#1604)

This commit is contained in:
Shams Asari 2017-09-22 17:38:40 +01:00 committed by josecoll
parent 6649c0394b
commit 31229b900a
9 changed files with 112 additions and 125 deletions

View File

@ -1,18 +1,14 @@
package net.corda.client.rpc package net.corda.client.rpc
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.client.rpc.serialization.KryoClientSerializationScheme
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.internal.serialization.AMQPClientSerializationScheme
import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import java.time.Duration import java.time.Duration
/** @see RPCClient.RPCConnection */ /** @see RPCClient.RPCConnection */
@ -49,7 +45,7 @@ class CordaRPCClient(
// others having registered first. // others having registered first.
// TODO: allow clients to have serialization factory etc injected and align with RPC protocol version? // TODO: allow clients to have serialization factory etc injected and align with RPC protocol version?
if (initialiseSerialization) { if (initialiseSerialization) {
initialiseSerialization() KryoClientSerializationScheme.initialiseSerialization()
} }
} }
@ -66,21 +62,4 @@ class CordaRPCClient(
inline fun <A> use(username: String, password: String, block: (CordaRPCConnection) -> A): A { inline fun <A> use(username: String, password: String, block: (CordaRPCConnection) -> A): A {
return start(username, password).use(block) return start(username, password).use(block)
} }
companion object {
fun initialiseSerialization() {
try {
SerializationDefaults.SERIALIZATION_FACTORY = SerializationFactoryImpl().apply {
registerScheme(KryoClientSerializationScheme())
registerScheme(AMQPClientSerializationScheme())
}
SerializationDefaults.P2P_CONTEXT = KRYO_P2P_CONTEXT
SerializationDefaults.RPC_CLIENT_CONTEXT = KRYO_RPC_CLIENT_CONTEXT
} catch(e: IllegalStateException) {
// Check that it's registered as we expect
check(SerializationDefaults.SERIALIZATION_FACTORY is SerializationFactoryImpl) { "RPC client encountered conflicting configuration of serialization subsystem." }
check((SerializationDefaults.SERIALIZATION_FACTORY as SerializationFactoryImpl).alreadyRegisteredSchemes.any { it is KryoClientSerializationScheme }) { "RPC client encountered conflicting configuration of serialization subsystem." }
}
}
}
} }

View File

@ -0,0 +1,41 @@
package net.corda.client.rpc.internal
import com.esotericsoftware.kryo.pool.KryoPool
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.utilities.ByteSequence
import net.corda.nodeapi.internal.serialization.*
class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
return byteSequence == KryoHeaderV0_1 && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
}
override fun rpcClientKryoPool(context: SerializationContext): KryoPool {
return KryoPool.Builder {
DefaultKryoCustomizer.customize(RPCKryo(RpcClientObservableSerializer, context)).apply {
classLoader = context.deserializationClassLoader
}
}.build()
}
// We're on the client and don't have access to server classes.
override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
companion object {
fun initialiseSerialization() {
try {
SerializationDefaults.SERIALIZATION_FACTORY = SerializationFactoryImpl().apply {
registerScheme(KryoClientSerializationScheme())
registerScheme(AMQPClientSerializationScheme())
}
SerializationDefaults.P2P_CONTEXT = KRYO_P2P_CONTEXT
SerializationDefaults.RPC_CLIENT_CONTEXT = KRYO_RPC_CLIENT_CONTEXT
} catch(e: IllegalStateException) {
// Check that it's registered as we expect
check(SerializationDefaults.SERIALIZATION_FACTORY is SerializationFactoryImpl) { "RPC client encountered conflicting configuration of serialization subsystem." }
check((SerializationDefaults.SERIALIZATION_FACTORY as SerializationFactoryImpl).alreadyRegisteredSchemes.any { it is KryoClientSerializationScheme }) { "RPC client encountered conflicting configuration of serialization subsystem." }
}
}
}
}

View File

@ -1,27 +0,0 @@
package net.corda.client.rpc.serialization
import com.esotericsoftware.kryo.pool.KryoPool
import net.corda.client.rpc.internal.RpcClientObservableSerializer
import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.ByteSequence
import net.corda.nodeapi.RPCKryo
import net.corda.nodeapi.internal.serialization.AbstractKryoSerializationScheme
import net.corda.nodeapi.internal.serialization.DefaultKryoCustomizer
import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1
class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
return byteSequence == KryoHeaderV0_1 && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
}
override fun rpcClientKryoPool(context: SerializationContext): KryoPool {
return KryoPool.Builder {
DefaultKryoCustomizer.customize(RPCKryo(RpcClientObservableSerializer, context)).apply { classLoader = context.deserializationClassLoader }
}.build()
}
// We're on the client and don't have access to server classes.
override fun rpcServerKryoPool(context: SerializationContext): KryoPool {
throw UnsupportedOperationException()
}
}

View File

@ -1,65 +0,0 @@
@file:JvmName("RPCStructures")
package net.corda.nodeapi
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import net.corda.core.concurrent.CordaFuture
import net.corda.core.serialization.SerializationContext
import net.corda.core.toFuture
import net.corda.core.toObservable
import net.corda.nodeapi.config.OldConfig
import net.corda.nodeapi.internal.serialization.*
import rx.Observable
import java.io.InputStream
data class User(
@OldConfig("user")
val username: String,
val password: String,
val permissions: Set<String>) {
override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)"
fun toMap() = mapOf(
"username" to username,
"password" to password,
"permissions" to permissions
)
}
/**
* The Kryo used for the RPC wire protocol.
*/
// Every type in the wire protocol is listed here explicitly.
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
// because we can see everything we're using in one place.
class RPCKryo(observableSerializer: Serializer<Observable<*>>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) {
init {
DefaultKryoCustomizer.customize(this)
// RPC specific classes
register(InputStream::class.java, InputStreamSerializer)
register(Observable::class.java, observableSerializer)
register(CordaFuture::class,
read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() },
write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) }
)
}
override fun getRegistration(type: Class<*>): Registration {
if (Observable::class.java != type && Observable::class.java.isAssignableFrom(type)) {
return super.getRegistration(Observable::class.java)
}
if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) {
return super.getRegistration(InputStream::class.java)
}
if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) {
return super.getRegistration(CordaFuture::class.java)
}
type.requireExternal("RPC not allowed to deserialise internal classes")
return super.getRegistration(type)
}
private fun Class<*>.requireExternal(msg: String) {
require(!name.startsWith("net.corda.node.") && !name.contains(".internal.")) { "$msg: $name" }
}
}

View File

@ -0,0 +1,16 @@
package net.corda.nodeapi
import net.corda.nodeapi.config.OldConfig
data class User(
@OldConfig("user")
val username: String,
val password: String,
val permissions: Set<String>) {
override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)"
fun toMap() = mapOf(
"username" to username,
"password" to password,
"permissions" to permissions
)
}

View File

@ -7,14 +7,18 @@ import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
import com.esotericsoftware.kryo.serializers.FieldSerializer import com.esotericsoftware.kryo.serializers.FieldSerializer
import com.esotericsoftware.kryo.util.MapReferenceResolver import com.esotericsoftware.kryo.util.MapReferenceResolver
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.PrivacySalt import net.corda.core.contracts.PrivacySalt
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Crypto import net.corda.core.crypto.Crypto
import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.TransactionSignature
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializeAsTokenContext import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.toFuture
import net.corda.core.toObservable
import net.corda.core.transactions.* import net.corda.core.transactions.*
import net.i2p.crypto.eddsa.EdDSAPrivateKey import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey import net.i2p.crypto.eddsa.EdDSAPublicKey
@ -26,6 +30,7 @@ import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.cert.X509CertificateHolder import org.bouncycastle.cert.X509CertificateHolder
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.Observable
import sun.security.ec.ECPublicKeyImpl import sun.security.ec.ECPublicKeyImpl
import sun.security.util.DerValue import sun.security.util.DerValue
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
@ -423,6 +428,44 @@ open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapRefe
} }
} }
/**
* The Kryo used for the RPC wire protocol.
*/
// Every type in the wire protocol is listed here explicitly.
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
// because we can see everything we're using in one place.
class RPCKryo(observableSerializer: Serializer<Observable<*>>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) {
init {
DefaultKryoCustomizer.customize(this)
// RPC specific classes
register(InputStream::class.java, InputStreamSerializer)
register(Observable::class.java, observableSerializer)
register(CordaFuture::class,
read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() },
write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) }
)
}
override fun getRegistration(type: Class<*>): Registration {
if (Observable::class.java != type && Observable::class.java.isAssignableFrom(type)) {
return super.getRegistration(Observable::class.java)
}
if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) {
return super.getRegistration(InputStream::class.java)
}
if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) {
return super.getRegistration(CordaFuture::class.java)
}
type.requireExternal("RPC not allowed to deserialise internal classes")
return super.getRegistration(type)
}
private fun Class<*>.requireExternal(msg: String) {
require(!name.startsWith("net.corda.node.") && ".internal" !in name) { "$msg: $name" }
}
}
inline fun <T : Any> Kryo.register( inline fun <T : Any> Kryo.register(
type: KClass<T>, type: KClass<T>,
crossinline read: (Kryo, Input) -> T, crossinline read: (Kryo, Input) -> T,
@ -445,7 +488,7 @@ inline fun <reified T : Any> Kryo.noReferencesWithin() {
register(T::class.java, NoReferencesSerializer(getSerializer(T::class.java))) register(T::class.java, NoReferencesSerializer(getSerializer(T::class.java)))
} }
class NoReferencesSerializer<T>(val baseSerializer: Serializer<T>) : Serializer<T>() { class NoReferencesSerializer<T>(private val baseSerializer: Serializer<T>) : Serializer<T>() {
override fun read(kryo: Kryo, input: Input, type: Class<T>): T { override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
return kryo.withoutReferences { baseSerializer.read(kryo, input, type) } return kryo.withoutReferences { baseSerializer.read(kryo, input, type) }

View File

@ -4,23 +4,23 @@ import com.esotericsoftware.kryo.pool.KryoPool
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.ByteSequence
import net.corda.node.services.messaging.RpcServerObservableSerializer import net.corda.node.services.messaging.RpcServerObservableSerializer
import net.corda.nodeapi.RPCKryo
import net.corda.nodeapi.internal.serialization.AbstractKryoSerializationScheme import net.corda.nodeapi.internal.serialization.AbstractKryoSerializationScheme
import net.corda.nodeapi.internal.serialization.DefaultKryoCustomizer import net.corda.nodeapi.internal.serialization.DefaultKryoCustomizer
import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1 import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1
import net.corda.nodeapi.internal.serialization.RPCKryo
class KryoServerSerializationScheme : AbstractKryoSerializationScheme() { class KryoServerSerializationScheme : AbstractKryoSerializationScheme() {
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean { override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
return byteSequence == KryoHeaderV0_1 && target != SerializationContext.UseCase.RPCClient return byteSequence == KryoHeaderV0_1 && target != SerializationContext.UseCase.RPCClient
} }
override fun rpcClientKryoPool(context: SerializationContext): KryoPool { override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
throw UnsupportedOperationException()
}
override fun rpcServerKryoPool(context: SerializationContext): KryoPool { override fun rpcServerKryoPool(context: SerializationContext): KryoPool {
return KryoPool.Builder { return KryoPool.Builder {
DefaultKryoCustomizer.customize(RPCKryo(RpcServerObservableSerializer, context)).apply { classLoader = context.deserializationClassLoader } DefaultKryoCustomizer.customize(RPCKryo(RpcServerObservableSerializer, context)).apply {
classLoader = context.deserializationClassLoader
}
}.build() }.build()
} }
} }

View File

@ -1,7 +1,7 @@
package net.corda.testing package net.corda.testing
import net.corda.client.mock.Generator import net.corda.client.mock.Generator
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
@ -498,7 +498,7 @@ class RandomRpcUser {
companion object { companion object {
private inline fun <reified T> HashMap<Class<*>, Generator<*>>.add(generator: Generator<T>) = this.putIfAbsent(T::class.java, generator) private inline fun <reified T> HashMap<Class<*>, Generator<*>>.add(generator: Generator<T>) = this.putIfAbsent(T::class.java, generator)
val generatorStore = HashMap<Class<*>, Generator<*>>().apply { private val generatorStore = HashMap<Class<*>, Generator<*>>().apply {
add(Generator.string()) add(Generator.string())
add(Generator.int()) add(Generator.int())
} }
@ -512,7 +512,7 @@ class RandomRpcUser {
val hostAndPort = NetworkHostAndPort.parse(args[1]) val hostAndPort = NetworkHostAndPort.parse(args[1])
val username = args[2] val username = args[2]
val password = args[3] val password = args[3]
CordaRPCClient.initialiseSerialization() KryoClientSerializationScheme.initialiseSerialization()
val handle = RPCClient<RPCOps>(hostAndPort, null, serializationContext = KRYO_RPC_CLIENT_CONTEXT).start(rpcClass, username, password) val handle = RPCClient<RPCOps>(hostAndPort, null, serializationContext = KRYO_RPC_CLIENT_CONTEXT).start(rpcClass, username, password)
val callGenerators = rpcClass.declaredMethods.map { method -> val callGenerators = rpcClass.declaredMethods.map { method ->
Generator.sequence(method.parameters.map { Generator.sequence(method.parameters.map {

View File

@ -1,6 +1,6 @@
package net.corda.testing package net.corda.testing
import net.corda.client.rpc.serialization.KryoClientSerializationScheme import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.serialization.* import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.ByteSequence