Merge remote-tracking branch 'open/master' into kat-merge-20180517

This commit is contained in:
Katelyn Baker
2018-05-17 15:04:51 +01:00
80 changed files with 6509 additions and 3351 deletions

View File

@ -76,6 +76,11 @@ dependencies {
testCompile "junit:junit:$junit_version"
testCompile "org.assertj:assertj-core:$assertj_version"
testCompile project(':node-driver')
compile ("org.apache.activemq:artemis-amqp-protocol:${artemis_version}") {
// Gains our proton-j version from core module.
exclude group: 'org.apache.qpid', module: 'proton-j'
}
}
configurations {

View File

@ -28,7 +28,9 @@ class InternalNodeException(message: String) : CordaRuntimeException(message) {
(wrapped as? CordaRuntimeException)?.setCause(null)
return when {
whitelisted.any { it.isInstance(wrapped) } -> wrapped
else -> InternalNodeException(DEFAULT_MESSAGE)
else -> InternalNodeException(DEFAULT_MESSAGE).apply {
stackTrace = emptyArray()
}
}
}
}

View File

@ -56,6 +56,7 @@ class ArtemisMessagingClient(
minLargeMessageSize = maxMessageSize
isUseGlobalPools = nodeSerializationEnv != null
confirmationWindowSize = this@ArtemisMessagingClient.confirmationWindowSize
addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))
}
val sessionFactory = locator.createSessionFactory()
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer)

View File

@ -40,7 +40,9 @@ class ArtemisMessagingComponent {
const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control"
const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify"
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
// This is a rough guess on the extra space needed on top of maxMessageSize to store the journal.
// TODO: we might want to make this value configurable.
const val JOURNAL_HEADER_SIZE = 1024
object P2PMessagingHeaders {
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint

View File

@ -22,3 +22,7 @@ import java.nio.file.Path
fun Path.requireOnDefaultFileSystem() {
require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
fun requireMessageSize(messageSize: Int, limit: Int) {
require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" }
}

View File

@ -0,0 +1,50 @@
package net.corda.nodeapi.internal
import net.corda.core.utilities.contextLogger
import org.apache.activemq.artemis.api.core.BaseInterceptor
import org.apache.activemq.artemis.api.core.Interceptor
import org.apache.activemq.artemis.core.protocol.core.Packet
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<Packet>(maxMessageSize), Interceptor {
override fun getMessageSize(packet: Packet?): Int? {
return when (packet) {
// This is an estimate of how much memory a Message body takes up.
// Note, it is only an estimate
is MessagePacket -> (packet.message.persistentSize - packet.message.headersAndPropertiesEncodeSize - 4).toInt()
// Skip all artemis control messages.
else -> null
}
}
}
class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize), AmqpInterceptor {
override fun getMessageSize(packet: AMQPMessage?): Int? = packet?.length
}
/**
* Artemis message interceptor to enforce maxMessageSize on incoming messages.
*/
sealed class MessageSizeChecksInterceptor<T : Any>(private val maxMessageSize: Int) : BaseInterceptor<T> {
companion object {
private val logger = contextLogger()
}
override fun intercept(packet: T, connection: RemotingConnection?): Boolean {
val messageSize = getMessageSize(packet) ?: return true
return if (messageSize > maxMessageSize) {
logger.warn("Message size exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [$messageSize], " +
"dropping message, client id :${connection?.clientID}")
false
} else {
true
}
}
// get size of the message in byte, returns null if the message is null or size don't need to be checked.
abstract fun getMessageSize(packet: T?): Int?
}

View File

@ -48,7 +48,8 @@ import kotlin.concurrent.withLock
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
*/
@VisibleForTesting
class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConfig: SocksProxyConfig? = null, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConfig: SocksProxyConfig? = null,
private val maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
private val lock = ReentrantLock()
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
@ -59,7 +60,8 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
private var artemis: ArtemisSessionProvider? = null
private val crlCheckSoftFail: Boolean = config.crlCheckSoftFail
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object {
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
@ -82,14 +84,16 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
crlCheckSoftFail: Boolean,
sharedEventGroup: EventLoopGroup,
socksProxyConfig: SocksProxyConfig?,
private val artemis: ArtemisSessionProvider) {
private val artemis: ArtemisSessionProvider,
private val maxMessageSize: Int) {
companion object {
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
}
private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}")
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig)
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail,
sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig, maxMessageSize = maxMessageSize)
val bridgeName: String get() = getBridgeName(queueName, target)
private val lock = ReentrantLock() // lock to serialise session level access
private var session: ClientSession? = null
@ -141,6 +145,13 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
}
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
if (artemisMessage.bodySize > maxMessageSize) {
log.warn("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
// Ack the message to prevent same message being sent to us again.
artemisMessage.acknowledge()
return
}
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
val properties = HashMap<String, Any?>()
for (key in P2PMessagingHeaders.whitelistedHeaders) {
@ -183,7 +194,9 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
if (bridgeExists(getBridgeName(queueName, target))) {
return
}
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedEventLoopGroup!!, socksProxyConfig, artemis!!)
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedEventLoopGroup!!, socksProxyConfig, artemis!!, maxMessageSize)
lock.withLock {
bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge
}

View File

@ -33,19 +33,24 @@ import rx.subjects.PublishSubject
import java.util.*
class BridgeControlListener(val config: NodeSSLConfiguration,
socksProxyConfig: SocksProxyConfig? = null,
maxMessageSize: Int,
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, artemisMessageClientFactory)
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize,
artemisMessageClientFactory)
private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null
constructor(config: NodeSSLConfiguration,
p2pAddress: NetworkHostAndPort,
maxMessageSize: Int,
socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object {
private val log = contextLogger()

View File

@ -33,10 +33,11 @@ import net.corda.nodeapi.internal.ContractsJarFile
import net.corda.nodeapi.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
import java.nio.file.Path
@ -288,7 +289,7 @@ class NetworkBootstrapper {
_contextSerializationEnv.set(SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {
registerScheme(KryoParametersSerializationScheme)
registerScheme(AMQPServerSerializationScheme())
registerScheme(AMQPParametersSerializationScheme)
},
AMQP_P2P_CONTEXT)
)
@ -302,4 +303,13 @@ class NetworkBootstrapper {
override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
}
private object AMQPParametersSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) {
override fun rpcClientSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
override fun rpcServerSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
return magic == amqpMagic && target == SerializationContext.UseCase.P2P
}
}
}

View File

@ -362,7 +362,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
val connection = event.connection
val channel = connection?.context as? Channel
if (channel != null) {
val appProperties = HashMap(amqpMessage.applicationProperties.value as Map<String, Any?>)
val appProperties = HashMap(amqpMessage.applicationProperties.value)
appProperties["_AMQ_VALIDATED_USER"] = remoteLegalName
val localAddress = channel.localAddress() as InetSocketAddress
val remoteAddress = channel.remoteAddress() as InetSocketAddress

View File

@ -27,6 +27,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import net.corda.nodeapi.internal.requireMessageSize
import rx.Observable
import rx.subjects.PublishSubject
import java.net.InetSocketAddress
@ -68,7 +69,8 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
private val crlCheckSoftFail: Boolean,
private val trace: Boolean = false,
private val sharedThreadPool: EventLoopGroup? = null,
private val socksProxyConfig: SocksProxyConfig? = null) : AutoCloseable {
private val socksProxyConfig: SocksProxyConfig? = null,
private val maxMessageSize: Int) : AutoCloseable {
companion object {
init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
@ -118,17 +120,15 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
}
}
private val closeListener = object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
log.info("Disconnected from $currentTarget")
future.channel()?.disconnect()
clientChannel = null
if (!stopping) {
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
private val closeListener = ChannelFutureListener { future ->
log.info("Disconnected from $currentTarget")
future.channel()?.disconnect()
clientChannel = null
if (!stopping) {
workerGroup?.schedule({
nextTarget()
restart()
}, retryInterval, TimeUnit.MILLISECONDS)
}
}
@ -228,6 +228,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
topic: String,
destinationLegalName: String,
properties: Map<String, Any?>): SendableMessage {
requireMessageSize(payload.size, maxMessageSize)
return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties)
}

View File

@ -27,6 +27,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import net.corda.nodeapi.internal.requireMessageSize
import org.apache.qpid.proton.engine.Delivery
import rx.Observable
import rx.subjects.PublishSubject
@ -51,7 +52,8 @@ class AMQPServer(val hostName: String,
private val keyStorePrivateKeyPassword: CharArray,
private val trustStore: KeyStore,
private val crlCheckSoftFail: Boolean,
private val trace: Boolean = false) : AutoCloseable {
private val trace: Boolean = false,
private val maxMessageSize: Int) : AutoCloseable {
companion object {
init {
@ -78,7 +80,8 @@ class AMQPServer(val hostName: String,
keyStorePrivateKeyPassword: String,
trustStore: KeyStore,
crlCheckSoftFail: Boolean,
trace: Boolean = false) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, crlCheckSoftFail, trace)
trace: Boolean = false,
maxMessageSize: Int) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, crlCheckSoftFail, trace, maxMessageSize)
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
@ -166,6 +169,7 @@ class AMQPServer(val hostName: String,
destinationLegalName: String,
destinationLink: NetworkHostAndPort,
properties: Map<String, Any?>): SendableMessage {
requireMessageSize(payload.size, maxMessageSize)
val dest = InetSocketAddress(destinationLink.host, destinationLink.port)
require(dest in clientChannels.keys) {
"Destination not available"

View File

@ -23,13 +23,7 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
* servers from trying to instantiate any of them.
*/
val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic,
SerializationDefaults.javaClass.classLoader,
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
emptyMap(),
true,
SerializationContext.UseCase.RPCClient,
null)
val AMQP_RPC_CLIENT_CONTEXT = SerializationContextImpl(amqpMagic,
SerializationDefaults.javaClass.classLoader,
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),

View File

@ -124,8 +124,9 @@ open class SerializationFactoryImpl(
val lookupKey = magic to target
return schemes.computeIfAbsent(lookupKey) {
registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single?
logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes")
throw UnsupportedOperationException("Serialization scheme not supported.")
logger.warn("Cannot find serialization scheme for: [$lookupKey, " +
"${if (magic == amqpMagic) "AMQP" else if (magic == kryoMagic) "Kryo" else "UNKNOWN MAGIC"}] registeredSchemes are: $registeredSchemes")
throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.")
} to magic
}

View File

@ -27,13 +27,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
* MUST be kept separate!
*/
val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic,
SerializationDefaults.javaClass.classLoader,
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
emptyMap(),
true,
SerializationContext.UseCase.RPCServer,
null)
val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic,
SerializationDefaults.javaClass.classLoader,

View File

@ -15,6 +15,7 @@ package net.corda.nodeapi.internal.serialization.amqp
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.objectOrNewInstance
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
@ -22,7 +23,6 @@ import net.corda.nodeapi.internal.serialization.DefaultWhitelist
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
import net.corda.nodeapi.internal.serialization.SerializationScheme
import java.lang.reflect.Modifier
import java.security.PublicKey
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@ -128,6 +128,12 @@ abstract class AbstractAMQPSerializationScheme(
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
}
context.properties[ContextPropertyKeys.SERIALIZERS]?.apply {
uncheckedCast<Any, List<CustomSerializer<out Any>>>(this).forEach {
factory.register(it)
}
}
}
/*
@ -141,7 +147,9 @@ abstract class AbstractAMQPSerializationScheme(
protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory
protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory
protected open val publicKeySerializer: CustomSerializer.Implements<PublicKey> = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer
// Not used as a simple direct import to facilitate testing
open val publicKeySerializer : CustomSerializer<*> = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer
private fun getSerializerFactory(context: SerializationContext): SerializerFactory {
return serializerFactoriesForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
@ -172,52 +180,3 @@ abstract class AbstractAMQPSerializationScheme(
protected fun canDeserializeVersion(magic: CordaSerializationMagic) = magic == amqpMagic
}
// TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented
class AMQPServerSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, ConcurrentHashMap())
constructor() : this(emptySet(), ConcurrentHashMap())
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
return canDeserializeVersion(magic) &&
(target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage)
}
}
// TODO: This will eventually cover client RPC as well and move to client module, but for now this is not implemented
class AMQPClientSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, ConcurrentHashMap())
constructor() : this(emptySet(), ConcurrentHashMap())
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
return canDeserializeVersion(magic) &&
(target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage)
}
}

View File

@ -295,7 +295,11 @@ open class SerializerFactory(
}
private fun makeClassSerializer(clazz: Class<*>, type: Type, declaredType: Type): AMQPSerializer<Any> = serializersByType.computeIfAbsent(type) {
if (isPrimitive(clazz)) {
if (clazz.isSynthetic) {
// Explicitly ban synthetic classes, we have no way of recreating them when deserializing. This also
// captures Lambda expressions and other anonymous functions
throw NotSerializableException(type.typeName)
} else if (isPrimitive(clazz)) {
AMQPPrimitiveSerializer(clazz)
} else {
findCustomSerializer(clazz, declaredType) ?: run {

View File

@ -12,7 +12,6 @@ package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
import net.corda.nodeapi.internal.serialization.amqp.custom.ClassSerializer.ClassProxy
/**
* A serializer for [Class] that uses [ClassProxy] proxy object to write out

View File

@ -49,7 +49,7 @@ object InputStreamSerializer : CustomSerializer.Implements<InputStream>(InputStr
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput,
context: SerializationContext
): InputStream {
) : InputStream {
val bits = input.readObject(obj, schemas, ByteArray::class.java, context) as ByteArray
return bits.inputStream()
}

View File

@ -0,0 +1,28 @@
package net.corda.nodeapi.internal.serialization.amqp.custom
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
import rx.Notification
class RxNotificationSerializer(
factory: SerializerFactory
) : CustomSerializer.Proxy<rx.Notification<*>, RxNotificationSerializer.Proxy>(
Notification::class.java,
Proxy::class.java,
factory
) {
data class Proxy(
val kind: Notification.Kind,
val t: Throwable?,
val value: Any?)
override fun toProxy(obj: Notification<*>) = Proxy(obj.kind, obj.throwable, obj.value)
override fun fromProxy(proxy: Proxy): Notification<*> {
return when (proxy.kind) {
Notification.Kind.OnCompleted -> Notification.createOnCompleted<Any>()
Notification.Kind.OnError -> Notification.createOnError<Any>(proxy.t)
Notification.Kind.OnNext -> Notification.createOnNext(proxy.value)
}
}
}

View File

@ -17,7 +17,6 @@ import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
import com.esotericsoftware.kryo.serializers.FieldSerializer
import com.esotericsoftware.kryo.util.MapReferenceResolver
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.PrivacySalt
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
@ -28,8 +27,6 @@ import net.corda.core.serialization.SerializationContext.UseCase.Checkpoint
import net.corda.core.serialization.SerializationContext.UseCase.Storage
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.toFuture
import net.corda.core.toObservable
import net.corda.core.transactions.*
import net.corda.core.utilities.OpaqueBytes
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
@ -38,7 +35,6 @@ import net.corda.nodeapi.internal.serialization.CordaClassResolver
import net.corda.nodeapi.internal.serialization.serializationContextKey
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Observable
import java.io.InputStream
import java.lang.reflect.InvocationTargetException
import java.security.PrivateKey
@ -57,39 +53,16 @@ import kotlin.reflect.jvm.isAccessible
import kotlin.reflect.jvm.javaType
/**
* Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead
* simple, totally non-extensible binary (sub)format.
*
* This is NOT what should be used in any final platform product, rather, the final state should be a precisely
* specified and standardised binary format with attention paid to anti-malleability, versioning and performance.
* FIX SBE is a potential candidate: it prioritises performance over convenience and was designed for HFT. Google
* Protocol Buffers with a minor tightening to make field reordering illegal is another possibility.
*
* FIX SBE:
* https://real-logic.github.io/simple-binary-encoding/
* http://mechanical-sympathy.blogspot.co.at/2014/05/simple-binary-encoding.html
* Protocol buffers:
* https://developers.google.com/protocol-buffers/
*
* But for now we use Kryo to maximise prototyping speed.
*
* Note that this code ignores *ALL* concerns beyond convenience, in particular it ignores:
*
* - Performance
* - Security
*
* This code will happily deserialise literally anything, including malicious streams that would reconstruct classes
* in invalid states, thus violating system invariants. It isn't designed to handle malicious streams and therefore,
* isn't usable beyond the prototyping stage. But that's fine: we can revisit serialisation technologies later after
* a formal evaluation process.
*
* We now distinguish between internal, storage related Kryo and external, network facing Kryo. We presently use
* some non-whitelisted classes as part of internal storage.
* TODO: eliminate internal, storage related whitelist issues, such as private keys in blob storage.
* Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead
* simple, totally non-extensible binary (sub)format. Used exclusively within Corda for checkpointing flows as
* it will happily deserialise literally anything, including malicious streams that would reconstruct classes
* in invalid states and thus violating system invariants. In the context of checkpointing a Java stack, this is
* absolutely the functionality we desire, for a stable binary wire format and persistence technology, we have
* the AMQP implementation.
*/
/**
* A serialiser that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure
* A serializer that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure
* type safety hack.
*/
object SerializedBytesSerializer : Serializer<SerializedBytes<Any>>() {
@ -405,44 +378,6 @@ open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapRefe
}
}
/**
* The Kryo used for the RPC wire protocol.
*/
// Every type in the wire protocol is listed here explicitly.
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
// because we can see everything we're using in one place.
class RPCKryo(observableSerializer: Serializer<Observable<*>>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) {
init {
DefaultKryoCustomizer.customize(this)
// RPC specific classes
register(InputStream::class.java, InputStreamSerializer)
register(Observable::class.java, observableSerializer)
register(CordaFuture::class,
read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() },
write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) }
)
}
override fun getRegistration(type: Class<*>): Registration {
if (Observable::class.java != type && Observable::class.java.isAssignableFrom(type)) {
return super.getRegistration(Observable::class.java)
}
if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) {
return super.getRegistration(InputStream::class.java)
}
if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) {
return super.getRegistration(CordaFuture::class.java)
}
type.requireExternal("RPC not allowed to deserialise internal classes")
return super.getRegistration(type)
}
private fun Class<*>.requireExternal(msg: String) {
require(!name.startsWith("net.corda.node.") && ".internal" !in name) { "$msg: $name" }
}
}
inline fun <T : Any> Kryo.register(
type: KClass<T>,
crossinline read: (Kryo, Input) -> T,

View File

@ -14,13 +14,13 @@ import com.google.common.collect.Maps;
import net.corda.core.serialization.SerializationContext;
import net.corda.core.serialization.SerializationFactory;
import net.corda.core.serialization.SerializedBytes;
import net.corda.nodeapi.internal.serialization.kryo.CordaClosureBlacklistSerializer;
import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt;
import net.corda.nodeapi.internal.serialization.amqp.SchemaKt;
import net.corda.testing.core.SerializationEnvironmentRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.EnumSet;
import java.util.concurrent.Callable;
@ -43,20 +43,17 @@ public final class ForbiddenLambdaSerializationTests {
@Test
public final void serialization_fails_for_serializable_java_lambdas() {
contexts.forEach(ctx -> {
SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(),
SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(),
this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null);
String value = "Hey";
Callable<String> target = (Callable<String> & Serializable) () -> value;
Throwable throwable = catchThrowable(() -> serialize(target, context));
assertThat(throwable).isNotNull();
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) {
assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE);
} else {
assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes");
}
assertThat(throwable)
.isNotNull()
.isInstanceOf(NotSerializableException.class)
.hasMessageContaining(getClass().getName());
});
}
@ -64,21 +61,17 @@ public final class ForbiddenLambdaSerializationTests {
@SuppressWarnings("unchecked")
public final void serialization_fails_for_not_serializable_java_lambdas() {
contexts.forEach(ctx -> {
SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(),
SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(),
this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null);
String value = "Hey";
Callable<String> target = () -> value;
Throwable throwable = catchThrowable(() -> serialize(target, context));
assertThat(throwable).isNotNull();
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
assertThat(throwable).isInstanceOf(IllegalArgumentException.class);
if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) {
assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE);
} else {
assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes");
}
assertThat(throwable)
.isNotNull()
.isInstanceOf(NotSerializableException.class)
.hasMessageContaining(getClass().getName());
});
}

View File

@ -18,13 +18,13 @@ import net.corda.core.internal.div
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.createDevKeyStores
import net.corda.nodeapi.internal.serialization.AllWhitelist
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.TestIdentity
@ -345,8 +345,8 @@ class X509UtilitiesTest {
@Test
fun `serialize - deserialize X509Certififcate`() {
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
val context = SerializationContextImpl(kryoMagic,
val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) }
val context = SerializationContextImpl(amqpMagic,
javaClass.classLoader,
AllWhitelist,
emptyMap(),
@ -361,8 +361,8 @@ class X509UtilitiesTest {
@Test
fun `serialize - deserialize X509CertPath`() {
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
val context = SerializationContextImpl(kryoMagic,
val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) }
val context = SerializationContextImpl(amqpMagic,
javaClass.classLoader,
AllWhitelist,
emptyMap(),

View File

@ -40,6 +40,7 @@ import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationFactory
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA
import net.corda.nodeapi.internal.crypto.ContentSignerBuilder
import net.corda.nodeapi.internal.serialization.AllWhitelist

View File

@ -15,6 +15,7 @@ import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.KryoSerializable
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.primitives.Ints
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
@ -22,9 +23,9 @@ import net.corda.core.contracts.PrivacySalt
import net.corda.core.crypto.*
import net.corda.core.internal.FetchDataFlow
import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.sequence
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.nodeapi.internal.serialization.*
import net.corda.testing.core.ALICE_NAME
@ -44,6 +45,17 @@ import java.time.Instant
import java.util.*
import kotlin.test.*
class TestScheme : AbstractKryoSerializationScheme() {
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient
}
override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
}
@RunWith(Parameterized::class)
class KryoTests(private val compression: CordaSerializationEncoding?) {
companion object {
@ -58,7 +70,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
@Before
fun setup() {
factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) }
context = SerializationContextImpl(kryoMagic,
javaClass.classLoader,
AllWhitelist,
@ -86,11 +98,12 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null))
}
@Test
fun `serialised form is stable when the same object instance is added to the deserialised object graph`() {
val noReferencesContext = context.withoutReferences()
val obj = Ints.toByteArray(0x01234567).sequence()
val originalList = arrayListOf(obj)
val obj : ByteSequence = Ints.toByteArray(0x01234567).sequence()
val originalList : ArrayList<ByteSequence> = arrayListOf(obj)
val deserialisedList = originalList.serialize(factory, noReferencesContext).deserialize(factory, noReferencesContext)
originalList += obj
deserialisedList += obj
@ -278,7 +291,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
}
}
Tmp()
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
val factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) }
val context = SerializationContextImpl(kryoMagic,
javaClass.classLoader,
AllWhitelist,