mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
CORDA-847 - RPC Server lib refactoring (#3056)
Just as we did for the RPC CLient, refactor kryo specific elements into their own sub module. Also move kryo specific components out of generic RPC files. Thus, adding AMQP support will be a much smoother operation
This commit is contained in:
parent
884928c956
commit
0d3c7e7762
@ -6,11 +6,13 @@ release, see :doc:`upgrade-notes`.
|
||||
|
||||
Unreleased
|
||||
==========
|
||||
|
||||
* Refactor RPC Client Kryo observable serialiser into it's own sub module
|
||||
|
||||
* Fix CORDA-1403 where a property of a class that implemented a generic interface could not be deserialised in
|
||||
a factory without a serialiser as the subtype check for the class instance failed. Fix is to compare the raw
|
||||
* Refactor RPC Server Kryo observable serializer into it's own sub module
|
||||
|
||||
* Refactor RPC Client Kryo observable serializer into it's own sub module
|
||||
|
||||
* Fix CORDA-1403 where a property of a class that implemented a generic interface could not be deserialized in
|
||||
a factory without a serializer as the subtype check for the class instance failed. Fix is to compare the raw
|
||||
type.
|
||||
|
||||
* Due to ongoing work the experimental interfaces for defining custom notary services have been moved to the internal package.
|
||||
|
@ -8,7 +8,7 @@ import net.corda.core.internal.div
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.createDevKeyStores
|
||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||
|
@ -14,7 +14,7 @@ import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
|
@ -25,7 +25,7 @@ import net.corda.node.internal.artemis.BrokerAddresses
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.node.services.api.NodePropertiesStore
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.*
|
||||
|
@ -1,9 +1,8 @@
|
||||
package net.corda.node.serialization
|
||||
package net.corda.node.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||
import net.corda.node.services.messaging.RpcServerObservableSerializer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
@ -0,0 +1,87 @@
|
||||
package net.corda.node.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.node.services.messaging.ObservableSubscription
|
||||
import net.corda.node.services.messaging.RPCServer
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
|
||||
object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
private object RpcObservableContextKey
|
||||
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext {
|
||||
return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo?, input: Input?, type: Class<Observable<*>>?): Observable<Any> {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
|
||||
val observableId = Trace.InvocationId.newInstance()
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext
|
||||
output.writeInvocationId(observableId)
|
||||
val observableWithSubscription = ObservableSubscription(
|
||||
// We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing
|
||||
// must be done again within the subscriber
|
||||
subscription = observable.materialize().subscribe(
|
||||
object : Subscriber<Notification<*>>() {
|
||||
override fun onNext(observation: Notification<*>) {
|
||||
if (!isUnsubscribed) {
|
||||
val message = RPCApi.ServerToClient.Observation(
|
||||
id = observableId,
|
||||
content = observation,
|
||||
deduplicationIdentity = observableContext.deduplicationIdentity
|
||||
)
|
||||
observableContext.sendMessage(message)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onError(exception: Throwable) {
|
||||
log.error("onError called in materialize()d RPC Observable", exception)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables != null) {
|
||||
observables.remove(observableId)
|
||||
if (observables.isEmpty()) {
|
||||
null
|
||||
} else {
|
||||
observables
|
||||
}
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables == null) {
|
||||
hashSetOf(observableId)
|
||||
} else {
|
||||
observables.add(observableId)
|
||||
observables
|
||||
}
|
||||
}
|
||||
observableContext.observableMap.put(observableId, observableWithSubscription)
|
||||
}
|
||||
|
||||
private fun Output.writeInvocationId(id: Trace.InvocationId) {
|
||||
|
||||
writeString(id.value)
|
||||
writeLong(id.timestamp.toEpochMilli())
|
||||
}
|
||||
}
|
@ -1,10 +1,6 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import co.paralleluniverse.common.util.SameThreadExecutor
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener
|
||||
@ -24,6 +20,7 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.internal.security.AuthorizingSubject
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.serialization.kryo.RpcServerObservableSerializer
|
||||
import net.corda.node.services.logging.pushToLoggingContext
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.externalTrace
|
||||
@ -39,11 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.slf4j.MDC
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import rx.Subscription
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
@ -486,74 +479,3 @@ class ObservableSubscription(
|
||||
)
|
||||
|
||||
typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
||||
|
||||
object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
private object RpcObservableContextKey
|
||||
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext {
|
||||
return RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo?, input: Input?, type: Class<Observable<*>>?): Observable<Any> {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
|
||||
val observableId = InvocationId.newInstance()
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext
|
||||
output.writeInvocationId(observableId)
|
||||
val observableWithSubscription = ObservableSubscription(
|
||||
// We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing
|
||||
// must be done again within the subscriber
|
||||
subscription = observable.materialize().subscribe(
|
||||
object : Subscriber<Notification<*>>() {
|
||||
override fun onNext(observation: Notification<*>) {
|
||||
if (!isUnsubscribed) {
|
||||
val message = RPCApi.ServerToClient.Observation(
|
||||
id = observableId,
|
||||
content = observation,
|
||||
deduplicationIdentity = observableContext.deduplicationIdentity
|
||||
)
|
||||
observableContext.sendMessage(message)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onError(exception: Throwable) {
|
||||
log.error("onError called in materialize()d RPC Observable", exception)
|
||||
}
|
||||
|
||||
override fun onCompleted() {
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables != null) {
|
||||
observables.remove(observableId)
|
||||
if (observables.isEmpty()) {
|
||||
null
|
||||
} else {
|
||||
observables
|
||||
}
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||
if (observables == null) {
|
||||
hashSetOf(observableId)
|
||||
} else {
|
||||
observables.add(observableId)
|
||||
observables
|
||||
}
|
||||
}
|
||||
observableContext.observableMap.put(observableId, observableWithSubscription)
|
||||
}
|
||||
|
||||
private fun Output.writeInvocationId(id: InvocationId) {
|
||||
|
||||
writeString(id.value)
|
||||
writeLong(id.timestamp.toEpochMilli())
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.serialization.internal.*
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
|
Loading…
Reference in New Issue
Block a user