mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
Merge remote-tracking branch 'open/master' into andrius/os-merge
This commit is contained in:
@ -44,7 +44,7 @@ dependencies {
|
||||
compile "de.javakaffee:kryo-serializers:0.41"
|
||||
|
||||
// For AMQP serialisation.
|
||||
compile "org.apache.qpid:proton-j:0.21.0"
|
||||
compile "org.apache.qpid:proton-j:0.27.1"
|
||||
|
||||
// SQL connection pooling library
|
||||
compile "com.zaxxer:HikariCP:$hikari_version"
|
||||
|
@ -2,10 +2,12 @@ package net.corda.nodeapi.exceptions
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.flows.FlowException
|
||||
import java.io.InvalidClassException
|
||||
|
||||
// could change to use package name matching but trying to avoid reflection for now
|
||||
private val whitelisted = setOf(
|
||||
FlowException::class,
|
||||
InvalidClassException::class,
|
||||
RpcSerializableError::class,
|
||||
TransactionVerificationException::class
|
||||
@ -23,7 +25,6 @@ class InternalNodeException(message: String) : CordaRuntimeException(message) {
|
||||
fun defaultMessage(): String = DEFAULT_MESSAGE
|
||||
|
||||
fun obfuscateIfInternal(wrapped: Throwable): Throwable {
|
||||
|
||||
(wrapped as? CordaRuntimeException)?.setCause(null)
|
||||
return when {
|
||||
whitelisted.any { it.isInstance(wrapped) } -> wrapped
|
||||
|
@ -142,7 +142,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<Any?, Any?>()
|
||||
val properties = HashMap<String, Any?>()
|
||||
for (key in P2PMessagingHeaders.whitelistedHeaders) {
|
||||
if (artemisMessage.containsProperty(key)) {
|
||||
var value = artemisMessage.getObjectProperty(key)
|
||||
|
@ -36,7 +36,8 @@ data class DatabaseConfig(
|
||||
val transactionIsolationLevel: TransactionIsolationLevel = TransactionIsolationLevel.REPEATABLE_READ,
|
||||
val schema: String? = null,
|
||||
val exportHibernateJMXStatistics: Boolean = false,
|
||||
val hibernateDialect: String? = null
|
||||
val hibernateDialect: String? = null,
|
||||
val mappedSchemaCacheSize: Long = 100
|
||||
)
|
||||
|
||||
// This class forms part of the node config and so any changes to it must be handled with care
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
package net.corda.nodeapi.internal.persistence
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import net.corda.core.internal.castIfPossible
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.utilities.contextLogger
|
||||
@ -30,7 +31,6 @@ import org.hibernate.type.descriptor.sql.BlobTypeDescriptor
|
||||
import org.hibernate.type.descriptor.sql.VarbinaryTypeDescriptor
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.sql.Connection
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import javax.management.ObjectName
|
||||
import javax.persistence.AttributeConverter
|
||||
|
||||
@ -61,8 +61,7 @@ class HibernateConfiguration(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
|
||||
private val sessionFactories = ConcurrentHashMap<Set<MappedSchema>, SessionFactory>()
|
||||
private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).build<Set<MappedSchema>, SessionFactory>()
|
||||
|
||||
val sessionFactoryForRegisteredSchemas = schemas.let {
|
||||
logger.info("Init HibernateConfiguration for schemas: $it")
|
||||
@ -70,7 +69,7 @@ class HibernateConfiguration(
|
||||
}
|
||||
|
||||
/** @param key must be immutable, not just read-only. */
|
||||
fun sessionFactoryForSchemas(key: Set<MappedSchema>) = sessionFactories.computeIfAbsent(key, { makeSessionFactoryForSchemas(key) })
|
||||
fun sessionFactoryForSchemas(key: Set<MappedSchema>): SessionFactory = sessionFactories.get(key, ::makeSessionFactoryForSchemas)!!
|
||||
|
||||
private fun makeSessionFactoryForSchemas(schemas: Set<MappedSchema>): SessionFactory {
|
||||
logger.info("Creating session factory for schemas: $schemas")
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.nodeapi.internal.protonwrapper.engine
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import org.apache.qpid.proton.codec.ReadableBuffer
|
||||
import org.apache.qpid.proton.codec.WritableBuffer
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
@ -67,6 +68,10 @@ internal class NettyWritable(val nettyBuffer: ByteBuf) : WritableBuffer {
|
||||
nettyBuffer.writeBytes(payload)
|
||||
}
|
||||
|
||||
override fun put(payload: ReadableBuffer) {
|
||||
nettyBuffer.writeBytes(payload.byteBuffer())
|
||||
}
|
||||
|
||||
override fun limit(): Int {
|
||||
return nettyBuffer.capacity()
|
||||
}
|
||||
|
@ -20,5 +20,5 @@ interface ApplicationMessage {
|
||||
val topic: String
|
||||
val destinationLegalName: String
|
||||
val destinationLink: NetworkHostAndPort
|
||||
val applicationProperties: Map<Any?, Any?>
|
||||
val applicationProperties: Map<String, Any?>
|
||||
}
|
@ -26,7 +26,7 @@ internal class ReceivedMessageImpl(override val payload: ByteArray,
|
||||
override val sourceLink: NetworkHostAndPort,
|
||||
override val destinationLegalName: String,
|
||||
override val destinationLink: NetworkHostAndPort,
|
||||
override val applicationProperties: Map<Any?, Any?>,
|
||||
override val applicationProperties: Map<String, Any?>,
|
||||
private val channel: Channel,
|
||||
private val delivery: Delivery) : ReceivedMessage {
|
||||
data class MessageCompleter(val status: MessageStatus, val delivery: Delivery)
|
||||
|
@ -25,7 +25,7 @@ internal class SendableMessageImpl(override val payload: ByteArray,
|
||||
override val topic: String,
|
||||
override val destinationLegalName: String,
|
||||
override val destinationLink: NetworkHostAndPort,
|
||||
override val applicationProperties: Map<Any?, Any?>) : SendableMessage {
|
||||
override val applicationProperties: Map<String, Any?>) : SendableMessage {
|
||||
var buf: ByteBuf? = null
|
||||
@Volatile
|
||||
var status: MessageStatus = MessageStatus.Unsent
|
||||
|
@ -216,7 +216,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
fun createMessage(payload: ByteArray,
|
||||
topic: String,
|
||||
destinationLegalName: String,
|
||||
properties: Map<Any?, Any?>): SendableMessage {
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties)
|
||||
}
|
||||
|
||||
|
@ -165,7 +165,7 @@ class AMQPServer(val hostName: String,
|
||||
topic: String,
|
||||
destinationLegalName: String,
|
||||
destinationLink: NetworkHostAndPort,
|
||||
properties: Map<Any?, Any?>): SendableMessage {
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
val dest = InetSocketAddress(destinationLink.host, destinationLink.port)
|
||||
require(dest in clientChannels.keys) {
|
||||
"Destination not available"
|
||||
|
@ -490,10 +490,10 @@ internal fun Type.asParameterizedType(): ParameterizedType {
|
||||
}
|
||||
|
||||
internal fun Type.isSubClassOf(type: Type): Boolean {
|
||||
return TypeToken.of(this).isSubtypeOf(type)
|
||||
return TypeToken.of(this).isSubtypeOf(TypeToken.of(type).rawType)
|
||||
}
|
||||
|
||||
// ByteArrays, primtives and boxed primitives are not stored in the object history
|
||||
// ByteArrays, primitives and boxed primitives are not stored in the object history
|
||||
internal fun suitableForObjectReference(type: Type): Boolean {
|
||||
val clazz = type.asClass()
|
||||
return type != ByteArray::class.java && (clazz != null && !clazz.isPrimitive && !Primitives.unwrap(clazz).isPrimitive)
|
||||
|
@ -18,7 +18,7 @@ import net.corda.core.internal.div
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.createDevKeyStores
|
||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||
|
@ -1320,5 +1320,30 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
C(12).serializeE()
|
||||
}.withMessageContaining("has synthetic fields and is likely a nested inner class")
|
||||
}
|
||||
|
||||
interface DataClassByInterface<V> {
|
||||
val v : V
|
||||
}
|
||||
|
||||
@Test
|
||||
fun dataClassBy() {
|
||||
data class C (val s: String) : DataClassByInterface<String> {
|
||||
override val v: String = "-- $s"
|
||||
}
|
||||
|
||||
data class Inner<T>(val wrapped: DataClassByInterface<T>) : DataClassByInterface<T> by wrapped {
|
||||
override val v = wrapped.v
|
||||
}
|
||||
|
||||
val i = Inner(C("hello"))
|
||||
|
||||
val bytes = SerializationOutput(testDefaultFactory()).serialize(i)
|
||||
|
||||
try {
|
||||
val i2 = DeserializationInput(testDefaultFactory()).deserialize(bytes)
|
||||
} catch (e : NotSerializableException) {
|
||||
throw Error ("Deserializing serialized \$C should not throw")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
|
Reference in New Issue
Block a user