CORDA-847 - RPC Clent lib refactoring (#3052)

Move Kryo into it's own sub module
This commit is contained in:
Katelyn Baker 2018-05-01 20:52:19 +01:00 committed by GitHub
parent 0c680ae530
commit e338414cd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 85 additions and 76 deletions

View File

@ -1,6 +1,6 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.context.Actor

View File

@ -14,6 +14,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.RPCSinceVersion
import net.corda.client.rpc.internal.serialization.kryo.RpcClientObservableSerializer
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
@ -547,62 +548,3 @@ data class ObservableContext(
val hardReferenceStore: MutableSet<Observable<*>>
)
/**
* A [Serializer] to deserialise Observables once the corresponding Kryo instance has been provided with an [ObservableContext].
*/
object RpcClientObservableSerializer : Serializer<Observable<*>>() {
private object RpcObservableContextKey
fun createContext(serializationContext: SerializationContext, observableContext: ObservableContext): SerializationContext {
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
}
private fun <T> pinInSubscriptions(observable: Observable<T>, hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
val refCount = AtomicInteger(0)
return observable.doOnSubscribe {
if (refCount.getAndIncrement() == 0) {
require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" }
}
}.doOnUnsubscribe {
if (refCount.decrementAndGet() == 0) {
require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" }
}
}
}
override fun read(kryo: Kryo, input: Input, type: Class<Observable<*>>): Observable<Any> {
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
val observableId = input.readInvocationId() ?: throw IllegalStateException("Unable to read invocationId from Input.")
val observable = UnicastSubject.create<Notification<*>>()
require(observableContext.observableMap.getIfPresent(observableId) == null) {
"Multiple Observables arrived with the same ID $observableId"
}
val rpcCallSite = getRpcCallSite(kryo, observableContext)
observableContext.observableMap.put(observableId, observable)
observableContext.callSiteMap?.put(observableId, rpcCallSite)
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
// don't need to store a reference to the Observables themselves.
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
// The unsubscribe is due to [ObservableToFuture]'s use of first().
observableContext.observableMap.invalidate(observableId)
}.dematerialize()
}
private fun Input.readInvocationId() : InvocationId? {
val value = readString() ?: return null
val timestamp = readLong()
return InvocationId(value, Instant.ofEpochMilli(timestamp))
}
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
throw UnsupportedOperationException("Cannot serialise Observables on the client side")
}
private fun getRpcCallSite(kryo: Kryo, observableContext: ObservableContext): Throwable? {
val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as InvocationId
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
}
}

View File

@ -1,4 +1,4 @@
package net.corda.client.rpc.internal
package net.corda.client.rpc.internal.serialization.kryo
import com.esotericsoftware.kryo.pool.KryoPool
import net.corda.core.serialization.SerializationContext

View File

@ -0,0 +1,75 @@
package net.corda.client.rpc.internal.serialization.kryo
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import net.corda.client.rpc.internal.ObservableContext
import net.corda.core.context.Trace
import net.corda.core.serialization.SerializationContext
import net.corda.nodeapi.RPCApi
import rx.Notification
import rx.Observable
import rx.subjects.UnicastSubject
import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
/**
* A [Serializer] to deserialise Observables once the corresponding Kryo instance has been provided with an [ObservableContext].
*/
object RpcClientObservableSerializer : Serializer<Observable<*>>() {
private object RpcObservableContextKey
fun createContext(serializationContext: SerializationContext, observableContext: ObservableContext): SerializationContext {
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
}
private fun <T> pinInSubscriptions(observable: Observable<T>, hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
val refCount = AtomicInteger(0)
return observable.doOnSubscribe {
if (refCount.getAndIncrement() == 0) {
require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" }
}
}.doOnUnsubscribe {
if (refCount.decrementAndGet() == 0) {
require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" }
}
}
}
override fun read(kryo: Kryo, input: Input, type: Class<Observable<*>>): Observable<Any> {
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
val observableId = input.readInvocationId() ?: throw IllegalStateException("Unable to read invocationId from Input.")
val observable = UnicastSubject.create<Notification<*>>()
require(observableContext.observableMap.getIfPresent(observableId) == null) {
"Multiple Observables arrived with the same ID $observableId"
}
val rpcCallSite = getRpcCallSite(kryo, observableContext)
observableContext.observableMap.put(observableId, observable)
observableContext.callSiteMap?.put(observableId, rpcCallSite)
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
// don't need to store a reference to the Observables themselves.
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
// The unsubscribe is due to [ObservableToFuture]'s use of first().
observableContext.observableMap.invalidate(observableId)
}.dematerialize()
}
private fun Input.readInvocationId() : Trace.InvocationId? {
val value = readString() ?: return null
val timestamp = readLong()
return Trace.InvocationId(value, Instant.ofEpochMilli(timestamp))
}
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
throw UnsupportedOperationException("Cannot serialise Observables on the client side")
}
private fun getRpcCallSite(kryo: Kryo, observableContext: ObservableContext): Throwable? {
val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
}
}

View File

@ -7,6 +7,8 @@ release, see :doc:`upgrade-notes`.
Unreleased
==========
* Refactor RPC Client Kryo observable serialiser into it's own sub module
* Fix CORDA-1403 where a property of a class that implemented a generic interface could not be deserialised in
a factory without a serialiser as the subtype check for the class instance failed. Fix is to compare the raw
type.

View File

@ -1,7 +1,7 @@
package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch

View File

@ -1,7 +1,7 @@
package net.corda.testing.node.internal
import net.corda.client.mock.Generator
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.concurrent.CordaFuture

View File

@ -2,7 +2,7 @@ package net.corda.smoketesting
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
import net.corda.core.internal.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger

View File

@ -2,19 +2,10 @@ package net.corda.testing.core
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doAnswer
import com.nhaarman.mockito_kotlin.doNothing
import com.nhaarman.mockito_kotlin.whenever
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.core.DoNotImplement
import net.corda.core.internal.staticField
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal._globalSerializationEnv
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.nodeapi.internal.serialization.*
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import net.corda.testing.common.internal.asContextEnv
import net.corda.testing.internal.createTestSerializationEnv
import net.corda.testing.internal.inVMExecutors
@ -24,7 +15,6 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector
import org.junit.rules.TestRule
import org.junit.runner.Description
import org.junit.runners.model.Statement
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

View File

@ -2,7 +2,7 @@ package net.corda.testing.internal
import com.nhaarman.mockito_kotlin.doNothing
import com.nhaarman.mockito_kotlin.whenever
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
import net.corda.core.DoNotImplement
import net.corda.core.serialization.internal.*
import net.corda.node.serialization.KryoServerSerializationScheme

View File

@ -1,7 +1,7 @@
package net.corda.demobench
import javafx.scene.image.Image
import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.demobench.views.DemoBenchView