mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
Merge remote-tracking branch 'open/master' into colljos-merge-171117
# Conflicts: # .idea/compiler.xml # build.gradle # client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt # docs/source/changelog.rst # node/src/integration-test/kotlin/net/corda/node/services/AttachmentLoadingTests.kt # node/src/main/kotlin/net/corda/node/internal/StartedNode.kt # node/src/main/kotlin/net/corda/node/utilities/registration/HTTPNetworkRegistrationService.kt # samples/network-visualiser/build.gradle # samples/simm-valuation-demo/src/integration-test/kotlin/net/corda/vega/SimmValuationTest.kt # testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt # testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt # testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt
This commit is contained in:
@ -1,12 +1,12 @@
|
||||
package net.corda.nodeapi
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
|
||||
sealed class ConnectionDirection {
|
||||
data class Inbound(val acceptorFactoryClassName: String) : ConnectionDirection()
|
||||
@ -20,17 +20,18 @@ class ArtemisTcpTransport {
|
||||
companion object {
|
||||
const val VERIFY_PEER_LEGAL_NAME = "corda.verifyPeerCommonName"
|
||||
|
||||
// Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher.
|
||||
// Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work
|
||||
// in case we need to use keytool certificates in some demos
|
||||
// Restrict enabled TLS cipher suites to:
|
||||
// AES128 using Galois/Counter Mode (GCM) for the block cipher being used to encrypt the message stream.
|
||||
// SHA256 as message authentication algorithm.
|
||||
// ECDHE as key exchange algorithm. DHE is also supported if one wants to completely avoid the use of ECC for TLS.
|
||||
// ECDSA and RSA for digital signatures. Our self-generated certificates all use ECDSA for handshakes,
|
||||
// but we allow classical RSA certificates to work in case:
|
||||
// a) we need to use keytool certificates in some demos,
|
||||
// b) we use cloud providers or HSMs that do not support ECC.
|
||||
private val CIPHER_SUITES = listOf(
|
||||
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
|
||||
"TLS_DHE_DSS_WITH_AES_128_GCM_SHA256"
|
||||
"TLS_DHE_RSA_WITH_AES_128_GCM_SHA256"
|
||||
)
|
||||
|
||||
fun tcpTransport(
|
||||
@ -40,15 +41,16 @@ class ArtemisTcpTransport {
|
||||
enableSSL: Boolean = true
|
||||
): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any?>(
|
||||
// Basic TCP target details
|
||||
// Basic TCP target details.
|
||||
TransportConstants.HOST_PROP_NAME to hostAndPort.host,
|
||||
TransportConstants.PORT_PROP_NAME to hostAndPort.port,
|
||||
|
||||
// Turn on AMQP support, which needs the protocol jar on the classpath.
|
||||
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
|
||||
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
|
||||
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
|
||||
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP"
|
||||
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
|
||||
// It does not use AMQP messages for its own messages e.g. topology and heartbeats.
|
||||
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications.
|
||||
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
|
||||
TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME to (nodeSerializationEnv != null)
|
||||
)
|
||||
|
||||
if (config != null && enableSSL) {
|
||||
@ -56,11 +58,11 @@ class ArtemisTcpTransport {
|
||||
config.trustStoreFile.requireOnDefaultFileSystem()
|
||||
val tlsOptions = mapOf(
|
||||
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
|
||||
// and AES encryption
|
||||
// and AES encryption.
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.KEYSTORE_PATH_PROP_NAME to config.sslKeystore,
|
||||
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
|
||||
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password.
|
||||
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStoreFile,
|
||||
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
|
||||
|
@ -1,15 +1,24 @@
|
||||
package net.corda.nodeapi
|
||||
|
||||
import net.corda.core.context.Actor
|
||||
import net.corda.core.context.AuthServiceId
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.context.Trace.InvocationId
|
||||
import net.corda.core.context.Trace.SessionId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.Id
|
||||
import net.corda.core.utilities.Try
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper
|
||||
import org.apache.activemq.artemis.reader.MessageUtil
|
||||
import rx.Notification
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
// The RPC protocol:
|
||||
@ -51,10 +60,6 @@ import java.util.*
|
||||
* Constants and data types used by the RPC API.
|
||||
*/
|
||||
object RPCApi {
|
||||
private val TAG_FIELD_NAME = "tag"
|
||||
private val RPC_ID_FIELD_NAME = "rpc-id"
|
||||
private val OBSERVABLE_ID_FIELD_NAME = "observable-id"
|
||||
private val METHOD_NAME_FIELD_NAME = "method-name"
|
||||
|
||||
/** Name of the Artemis queue on which the server receives RPC requests (as [ClientToServer.RpcRequest]). */
|
||||
const val RPC_SERVER_QUEUE_NAME = "rpc.server"
|
||||
@ -65,6 +70,7 @@ object RPCApi {
|
||||
const val RPC_CLIENT_QUEUE_NAME_PREFIX = "rpc.client"
|
||||
const val RPC_CLIENT_BINDING_REMOVALS = "rpc.clientqueueremovals"
|
||||
const val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions"
|
||||
const val RPC_TARGET_LEGAL_IDENTITY = "rpc-target-legal-identity"
|
||||
|
||||
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
|
||||
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
|
||||
@ -73,9 +79,6 @@ object RPCApi {
|
||||
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_ADDED.name}' AND " +
|
||||
"${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'"
|
||||
|
||||
data class RpcRequestId(val toLong: Long)
|
||||
data class ObservableId(val toLong: Long)
|
||||
|
||||
object RpcRequestOrObservableIdKey
|
||||
|
||||
private fun ClientMessage.getBodyAsByteArray(): ByteArray {
|
||||
@ -101,28 +104,35 @@ object RPCApi {
|
||||
*/
|
||||
data class RpcRequest(
|
||||
val clientAddress: SimpleString,
|
||||
val id: RpcRequestId,
|
||||
val methodName: String,
|
||||
val serialisedArguments: ByteArray
|
||||
val serialisedArguments: ByteArray,
|
||||
val replyId: InvocationId,
|
||||
val sessionId: SessionId,
|
||||
val externalTrace: Trace? = null,
|
||||
val impersonatedActor: Actor? = null
|
||||
) : ClientToServer() {
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
MessageUtil.setJMSReplyTo(message, clientAddress)
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REQUEST.ordinal)
|
||||
message.putLongProperty(RPC_ID_FIELD_NAME, id.toLong)
|
||||
|
||||
replyId.mapTo(message)
|
||||
sessionId.mapTo(message)
|
||||
|
||||
externalTrace?.mapToExternal(message)
|
||||
impersonatedActor?.mapToImpersonated(message)
|
||||
|
||||
message.putStringProperty(METHOD_NAME_FIELD_NAME, methodName)
|
||||
message.bodyBuffer.writeBytes(serialisedArguments)
|
||||
}
|
||||
}
|
||||
|
||||
data class ObservablesClosed(
|
||||
val ids: List<ObservableId>
|
||||
) : ClientToServer() {
|
||||
data class ObservablesClosed(val ids: List<InvocationId>) : ClientToServer() {
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.OBSERVABLES_CLOSED.ordinal)
|
||||
val buffer = message.bodyBuffer
|
||||
buffer.writeInt(ids.size)
|
||||
ids.forEach {
|
||||
buffer.writeLong(it.toLong)
|
||||
buffer.writeInvocationId(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -133,16 +143,19 @@ object RPCApi {
|
||||
return when (tag) {
|
||||
RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest(
|
||||
clientAddress = MessageUtil.getJMSReplyTo(message),
|
||||
id = RpcRequestId(message.getLongProperty(RPC_ID_FIELD_NAME)),
|
||||
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
|
||||
serialisedArguments = message.getBodyAsByteArray()
|
||||
serialisedArguments = message.getBodyAsByteArray(),
|
||||
replyId = message.replyId(),
|
||||
sessionId = message.sessionId(),
|
||||
externalTrace = message.externalTrace(),
|
||||
impersonatedActor = message.impersonatedActor()
|
||||
)
|
||||
RPCApi.ClientToServer.Tag.OBSERVABLES_CLOSED -> {
|
||||
val ids = ArrayList<ObservableId>()
|
||||
val ids = ArrayList<InvocationId>()
|
||||
val buffer = message.bodyBuffer
|
||||
val numberOfIds = buffer.readInt()
|
||||
for (i in 1..numberOfIds) {
|
||||
ids.add(ObservableId(buffer.readLong()))
|
||||
ids.add(buffer.readInvocationId())
|
||||
}
|
||||
ObservablesClosed(ids)
|
||||
}
|
||||
@ -164,23 +177,23 @@ object RPCApi {
|
||||
|
||||
/** Reply in response to an [ClientToServer.RpcRequest]. */
|
||||
data class RpcReply(
|
||||
val id: RpcRequestId,
|
||||
val id: InvocationId,
|
||||
val result: Try<Any?>
|
||||
) : ServerToClient() {
|
||||
override fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REPLY.ordinal)
|
||||
message.putLongProperty(RPC_ID_FIELD_NAME, id.toLong)
|
||||
id.mapTo(message, RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME)
|
||||
message.bodyBuffer.writeBytes(result.safeSerialize(context) { Try.Failure<Any>(it) }.bytes)
|
||||
}
|
||||
}
|
||||
|
||||
data class Observation(
|
||||
val id: ObservableId,
|
||||
val id: InvocationId,
|
||||
val content: Notification<*>
|
||||
) : ServerToClient() {
|
||||
override fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.OBSERVATION.ordinal)
|
||||
message.putLongProperty(OBSERVABLE_ID_FIELD_NAME, id.toLong)
|
||||
id.mapTo(message, OBSERVABLE_ID_FIELD_NAME, OBSERVABLE_ID_TIMESTAMP_FIELD_NAME)
|
||||
message.bodyBuffer.writeBytes(content.safeSerialize(context) { Notification.createOnError<Void?>(it) }.bytes)
|
||||
}
|
||||
}
|
||||
@ -196,20 +209,15 @@ object RPCApi {
|
||||
val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)]
|
||||
return when (tag) {
|
||||
RPCApi.ServerToClient.Tag.RPC_REPLY -> {
|
||||
val id = RpcRequestId(message.getLongProperty(RPC_ID_FIELD_NAME))
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id.toLong)
|
||||
RpcReply(
|
||||
id = id,
|
||||
result = message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
|
||||
)
|
||||
val id = message.invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id)
|
||||
RpcReply(id, message.getBodyAsByteArray().deserialize(context = poolWithIdContext))
|
||||
}
|
||||
RPCApi.ServerToClient.Tag.OBSERVATION -> {
|
||||
val id = ObservableId(message.getLongProperty(OBSERVABLE_ID_FIELD_NAME))
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id.toLong)
|
||||
Observation(
|
||||
id = id,
|
||||
content = message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
|
||||
)
|
||||
val observableId = message.invocationId(OBSERVABLE_ID_FIELD_NAME, OBSERVABLE_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, observableId)
|
||||
val payload = message.getBodyAsByteArray().deserialize<Notification<*>>(context = poolWithIdContext)
|
||||
Observation(observableId, payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -228,3 +236,108 @@ data class ArtemisConsumer(
|
||||
val session: ClientSession,
|
||||
val consumer: ClientConsumer
|
||||
)
|
||||
|
||||
private val TAG_FIELD_NAME = "tag"
|
||||
private val RPC_ID_FIELD_NAME = "rpc-id"
|
||||
private val RPC_ID_TIMESTAMP_FIELD_NAME = "rpc-id-timestamp"
|
||||
private val RPC_SESSION_ID_FIELD_NAME = "rpc-session-id"
|
||||
private val RPC_SESSION_ID_TIMESTAMP_FIELD_NAME = "rpc-session-id-timestamp"
|
||||
private val RPC_EXTERNAL_ID_FIELD_NAME = "rpc-external-id"
|
||||
private val RPC_EXTERNAL_ID_TIMESTAMP_FIELD_NAME = "rpc-external-id-timestamp"
|
||||
private val RPC_EXTERNAL_SESSION_ID_FIELD_NAME = "rpc-external-session-id"
|
||||
private val RPC_EXTERNAL_SESSION_ID_TIMESTAMP_FIELD_NAME = "rpc-external-session-id-timestamp"
|
||||
private val RPC_IMPERSONATED_ACTOR_ID = "rpc-impersonated-actor-id"
|
||||
private val RPC_IMPERSONATED_ACTOR_STORE_ID = "rpc-impersonated-actor-store-id"
|
||||
private val RPC_IMPERSONATED_ACTOR_OWNING_LEGAL_IDENTITY = "rpc-impersonated-actor-owningLegalIdentity"
|
||||
private val OBSERVABLE_ID_FIELD_NAME = "observable-id"
|
||||
private val OBSERVABLE_ID_TIMESTAMP_FIELD_NAME = "observable-id-timestamp"
|
||||
private val METHOD_NAME_FIELD_NAME = "method-name"
|
||||
|
||||
fun ClientMessage.replyId(): InvocationId {
|
||||
|
||||
return invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot extract reply id from client message.")
|
||||
}
|
||||
|
||||
fun ClientMessage.sessionId(): SessionId {
|
||||
|
||||
return sessionId(RPC_SESSION_ID_FIELD_NAME, RPC_SESSION_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot extract the session id from client message.")
|
||||
}
|
||||
|
||||
fun ClientMessage.externalTrace(): Trace? {
|
||||
|
||||
val invocationId = invocationId(RPC_EXTERNAL_ID_FIELD_NAME, RPC_EXTERNAL_ID_TIMESTAMP_FIELD_NAME)
|
||||
val sessionId = sessionId(RPC_EXTERNAL_SESSION_ID_FIELD_NAME, RPC_EXTERNAL_SESSION_ID_TIMESTAMP_FIELD_NAME)
|
||||
|
||||
return when {
|
||||
invocationId == null || sessionId == null -> null
|
||||
else -> Trace(invocationId, sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
fun ClientMessage.impersonatedActor(): Actor? {
|
||||
|
||||
return getStringProperty(RPC_IMPERSONATED_ACTOR_ID)?.let {
|
||||
val impersonatedStoreId = getStringProperty(RPC_IMPERSONATED_ACTOR_STORE_ID)
|
||||
val impersonatingOwningLegalIdentity = getStringProperty(RPC_IMPERSONATED_ACTOR_OWNING_LEGAL_IDENTITY)
|
||||
if (impersonatedStoreId == null || impersonatingOwningLegalIdentity == null) {
|
||||
throw IllegalStateException("Cannot extract impersonated actor from client message.")
|
||||
}
|
||||
Actor(Actor.Id(it), AuthServiceId(impersonatedStoreId), CordaX500Name.parse(impersonatingOwningLegalIdentity))
|
||||
}
|
||||
}
|
||||
|
||||
private fun Id<String>.mapTo(message: ClientMessage, valueProperty: String, timestampProperty: String) {
|
||||
|
||||
message.putStringProperty(valueProperty, value)
|
||||
message.putLongProperty(timestampProperty, timestamp.toEpochMilli())
|
||||
}
|
||||
|
||||
private fun ActiveMQBuffer.writeInvocationId(invocationId: InvocationId) {
|
||||
|
||||
this.writeString(invocationId.value)
|
||||
this.writeLong(invocationId.timestamp.toEpochMilli())
|
||||
}
|
||||
|
||||
private fun ActiveMQBuffer.readInvocationId() : InvocationId {
|
||||
|
||||
val value = this.readString()
|
||||
val timestamp = this.readLong()
|
||||
return InvocationId(value, Instant.ofEpochMilli(timestamp))
|
||||
}
|
||||
|
||||
private fun InvocationId.mapTo(message: ClientMessage) = mapTo(message, RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME)
|
||||
|
||||
private fun SessionId.mapTo(message: ClientMessage) = mapTo(message, RPC_SESSION_ID_FIELD_NAME, RPC_SESSION_ID_TIMESTAMP_FIELD_NAME)
|
||||
|
||||
private fun Trace.mapToExternal(message: ClientMessage) = mapTo(message, RPC_EXTERNAL_ID_FIELD_NAME, RPC_EXTERNAL_ID_TIMESTAMP_FIELD_NAME, RPC_EXTERNAL_SESSION_ID_FIELD_NAME, RPC_EXTERNAL_SESSION_ID_TIMESTAMP_FIELD_NAME)
|
||||
|
||||
private fun Actor.mapToImpersonated(message: ClientMessage) {
|
||||
|
||||
message.putStringProperty(RPC_IMPERSONATED_ACTOR_ID, this.id.value)
|
||||
message.putStringProperty(RPC_IMPERSONATED_ACTOR_STORE_ID, this.serviceId.value)
|
||||
message.putStringProperty(RPC_IMPERSONATED_ACTOR_OWNING_LEGAL_IDENTITY, this.owningLegalIdentity.toString())
|
||||
}
|
||||
|
||||
private fun Trace.mapTo(message: ClientMessage, valueProperty: String, timestampProperty: String, sessionValueProperty: String, sessionTimestampProperty: String) = apply {
|
||||
|
||||
invocationId.apply {
|
||||
message.putStringProperty(valueProperty, value)
|
||||
message.putLongProperty(timestampProperty, timestamp.toEpochMilli())
|
||||
}
|
||||
sessionId.apply {
|
||||
message.putStringProperty(sessionValueProperty, value)
|
||||
message.putLongProperty(sessionTimestampProperty, timestamp.toEpochMilli())
|
||||
}
|
||||
}
|
||||
|
||||
private fun ClientMessage.invocationId(valueProperty: String, timestampProperty: String): InvocationId? = id(valueProperty, timestampProperty, ::InvocationId)
|
||||
|
||||
private fun ClientMessage.sessionId(valueProperty: String, timestampProperty: String): SessionId? = id(valueProperty, timestampProperty, ::SessionId)
|
||||
|
||||
private fun <ID : Id<*>> ClientMessage.id(valueProperty: String, timestampProperty: String, construct: (value: String, timestamp: Instant) -> ID): ID? {
|
||||
|
||||
// returning null because getLongProperty throws trying to convert null to long
|
||||
val idRaw = this.getStringProperty(valueProperty) ?: return null
|
||||
val timestampRaw = this.getLongProperty(timestampProperty)
|
||||
return construct(idRaw, Instant.ofEpochMilli(timestampRaw))
|
||||
}
|
@ -1,19 +1,17 @@
|
||||
package net.corda.nodeapi
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import java.security.PublicKey
|
||||
|
||||
/**
|
||||
* The base class for Artemis services that defines shared data structures and SSL transport configuration.
|
||||
*/
|
||||
abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
class ArtemisMessagingComponent {
|
||||
companion object {
|
||||
init {
|
||||
System.setProperty("org.jboss.logging.provider", "slf4j")
|
||||
@ -23,13 +21,10 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
// case is a forward slash
|
||||
const val NODE_USER = "SystemUsers/Node"
|
||||
const val PEER_USER = "SystemUsers/Peer"
|
||||
|
||||
const val INTERNAL_PREFIX = "internal."
|
||||
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
|
||||
const val IP_REQUEST_PREFIX = "ip."
|
||||
const val P2P_QUEUE = "p2p.inbound"
|
||||
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
||||
const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap"
|
||||
}
|
||||
|
||||
interface ArtemisAddress : MessageRecipients {
|
||||
@ -69,7 +64,4 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
data class ServiceAddress(val identity: PublicKey) : ArtemisAddress, MessageRecipientGroup {
|
||||
override val queueName: String = "$PEERS_PREFIX${identity.toBase58String()}"
|
||||
}
|
||||
|
||||
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
|
||||
abstract val config: SSLConfiguration?
|
||||
}
|
@ -36,20 +36,6 @@ val KRYO_STORAGE_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
true,
|
||||
SerializationContext.UseCase.Storage)
|
||||
|
||||
val KRYO_P2P_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
||||
|
||||
val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
QuasarWhitelist,
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.Checkpoint)
|
||||
|
||||
|
||||
val AMQP_STORAGE_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
@ -58,17 +44,9 @@ val AMQP_STORAGE_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
true,
|
||||
SerializationContext.UseCase.Storage)
|
||||
|
||||
val AMQP_P2P_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
||||
|
||||
val AMQP_RPC_SERVER_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.RPCServer)
|
||||
|
||||
|
@ -0,0 +1,39 @@
|
||||
@file:JvmName("SharedContexts")
|
||||
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
|
||||
/*
|
||||
* Serialisation contexts shared by the server and client.
|
||||
*
|
||||
* NOTE: The [KRYO_STORAGE_CONTEXT] and [AMQP_STORAGE_CONTEXT]
|
||||
* CANNOT always be instantiated outside of the server and so
|
||||
* MUST be kept separate from these ones!
|
||||
*/
|
||||
|
||||
val KRYO_P2P_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
||||
|
||||
val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
QuasarWhitelist,
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.Checkpoint)
|
||||
|
||||
val AMQP_P2P_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
||||
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.SerializationScheme
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
@ -28,47 +29,50 @@ abstract class AbstractAMQPSerializationScheme : SerializationScheme {
|
||||
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
|
||||
ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist
|
||||
}
|
||||
}
|
||||
|
||||
fun registerCustomSerializers(factory: SerializerFactory) {
|
||||
with(factory) {
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PrivateKeySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ThrowableSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.X500NameSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.BigDecimalSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.CurrencySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.OpaqueBytesSubSequenceSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.InstantSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.DurationSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.LocalDateSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.LocalDateTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.LocalTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ZonedDateTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ZoneIdSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.OffsetTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.OffsetDateTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.YearSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.YearMonthSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.MonthDaySerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PeriodSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ClassSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.X509CertificateHolderSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PartyAndCertificateSerializer(factory))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.StringBufferSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.InputStreamSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.BitSetSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this))
|
||||
}
|
||||
for (whitelistProvider in serializationWhitelists)
|
||||
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
|
||||
private fun registerCustomSerializers(factory: SerializerFactory) {
|
||||
with(factory) {
|
||||
register(publicKeySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PrivateKeySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ThrowableSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.X500NameSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.BigDecimalSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.CurrencySerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.OpaqueBytesSubSequenceSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.InstantSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.DurationSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.LocalDateSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.LocalDateTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.LocalTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ZonedDateTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ZoneIdSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.OffsetTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.OffsetDateTimeSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.YearSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.YearMonthSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.MonthDaySerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PeriodSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ClassSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.X509CertificateHolderSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.PartyAndCertificateSerializer(factory))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.StringBufferSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.InputStreamSerializer)
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.BitSetSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(this))
|
||||
}
|
||||
for (whitelistProvider in serializationWhitelists)
|
||||
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
|
||||
}
|
||||
|
||||
private val serializerFactoriesForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>()
|
||||
|
||||
protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory
|
||||
protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory
|
||||
open protected val publicKeySerializer: CustomSerializer.Implements<PublicKey>
|
||||
= net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer
|
||||
|
||||
private fun getSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
return serializerFactoriesForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
||||
|
@ -0,0 +1,32 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp.custom
|
||||
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.contracts.ContractAttachment
|
||||
import net.corda.core.contracts.ContractClassName
|
||||
import net.corda.core.serialization.MissingAttachmentsException
|
||||
import net.corda.nodeapi.internal.serialization.GeneratedAttachment
|
||||
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
|
||||
/**
|
||||
* A serializer for [ContractAttachment] that uses a proxy object to write out the full attachment eagerly.
|
||||
* @param factory the serializerFactory
|
||||
*/
|
||||
class ContractAttachmentSerializer(factory: SerializerFactory) : CustomSerializer.Proxy<ContractAttachment,
|
||||
ContractAttachmentSerializer.ContractAttachmentProxy>(ContractAttachment::class.java,
|
||||
ContractAttachmentProxy::class.java, factory) {
|
||||
override fun toProxy(obj: ContractAttachment): ContractAttachmentProxy {
|
||||
val bytes = try {
|
||||
obj.attachment.open().readBytes()
|
||||
} catch (e: Exception) {
|
||||
throw MissingAttachmentsException(listOf(obj.id))
|
||||
}
|
||||
return ContractAttachmentProxy(GeneratedAttachment(bytes), obj.contract)
|
||||
}
|
||||
|
||||
override fun fromProxy(proxy: ContractAttachmentProxy): ContractAttachment {
|
||||
return ContractAttachment(proxy.attachment, proxy.contract)
|
||||
}
|
||||
|
||||
data class ContractAttachmentProxy(val attachment: Attachment, val contract: ContractClassName)
|
||||
}
|
@ -14,7 +14,10 @@ import de.javakaffee.kryoserializers.guava.*
|
||||
import net.corda.core.contracts.ContractAttachment
|
||||
import net.corda.core.contracts.PrivacySalt
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.AbstractAttachment
|
||||
import net.corda.core.serialization.MissingAttachmentsException
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
@ -48,6 +51,7 @@ import java.io.ByteArrayOutputStream
|
||||
import java.io.FileInputStream
|
||||
import java.io.InputStream
|
||||
import java.lang.reflect.Modifier.isPublic
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.CertPath
|
||||
import java.util.*
|
||||
import kotlin.collections.ArrayList
|
||||
@ -57,7 +61,7 @@ object DefaultKryoCustomizer {
|
||||
ServiceLoader.load(SerializationWhitelist::class.java, this.javaClass.classLoader).toList() + DefaultWhitelist
|
||||
}
|
||||
|
||||
fun customize(kryo: Kryo): Kryo {
|
||||
fun customize(kryo: Kryo, publicKeySerializer: Serializer<PublicKey> = PublicKeySerializer): Kryo {
|
||||
return kryo.apply {
|
||||
// Store a little schema of field names in the stream the first time a class is used which increases tolerance
|
||||
// for change to a class.
|
||||
@ -92,10 +96,10 @@ object DefaultKryoCustomizer {
|
||||
register(BufferedInputStream::class.java, InputStreamSerializer)
|
||||
register(Class.forName("sun.net.www.protocol.jar.JarURLConnection\$JarURLInputStream"), InputStreamSerializer)
|
||||
noReferencesWithin<WireTransaction>()
|
||||
register(ECPublicKeyImpl::class.java, PublicKeySerializer)
|
||||
register(EdDSAPublicKey::class.java, PublicKeySerializer)
|
||||
register(ECPublicKeyImpl::class.java, publicKeySerializer)
|
||||
register(EdDSAPublicKey::class.java, publicKeySerializer)
|
||||
register(EdDSAPrivateKey::class.java, PrivateKeySerializer)
|
||||
register(CompositeKey::class.java, PublicKeySerializer) // Using a custom serializer for compactness
|
||||
register(CompositeKey::class.java, publicKeySerializer) // Using a custom serializer for compactness
|
||||
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
|
||||
register(Array<StackTraceElement>::class, read = { _, _ -> emptyArray() }, write = { _, _, _ -> })
|
||||
// This ensures a NonEmptySetSerializer is constructed with an initial value.
|
||||
@ -108,11 +112,11 @@ object DefaultKryoCustomizer {
|
||||
register(X500Name::class.java, X500NameSerializer)
|
||||
register(X509CertificateHolder::class.java, X509CertificateSerializer)
|
||||
register(BCECPrivateKey::class.java, PrivateKeySerializer)
|
||||
register(BCECPublicKey::class.java, PublicKeySerializer)
|
||||
register(BCECPublicKey::class.java, publicKeySerializer)
|
||||
register(BCRSAPrivateCrtKey::class.java, PrivateKeySerializer)
|
||||
register(BCRSAPublicKey::class.java, PublicKeySerializer)
|
||||
register(BCRSAPublicKey::class.java, publicKeySerializer)
|
||||
register(BCSphincs256PrivateKey::class.java, PrivateKeySerializer)
|
||||
register(BCSphincs256PublicKey::class.java, PublicKeySerializer)
|
||||
register(BCSphincs256PublicKey::class.java, publicKeySerializer)
|
||||
register(NotaryChangeWireTransaction::class.java, NotaryChangeWireTransactionSerializer)
|
||||
register(PartyAndCertificate::class.java, PartyAndCertificateSerializer)
|
||||
|
||||
@ -196,15 +200,38 @@ object DefaultKryoCustomizer {
|
||||
|
||||
private object ContractAttachmentSerializer : Serializer<ContractAttachment>() {
|
||||
override fun write(kryo: Kryo, output: Output, obj: ContractAttachment) {
|
||||
val buffer = ByteArrayOutputStream()
|
||||
obj.attachment.open().use { it.copyTo(buffer) }
|
||||
output.writeBytesWithLength(buffer.toByteArray())
|
||||
if (kryo.serializationContext() != null) {
|
||||
output.writeBytes(obj.attachment.id.bytes)
|
||||
} else {
|
||||
val buffer = ByteArrayOutputStream()
|
||||
obj.attachment.open().use { it.copyTo(buffer) }
|
||||
output.writeBytesWithLength(buffer.toByteArray())
|
||||
}
|
||||
output.writeString(obj.contract)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<ContractAttachment>): ContractAttachment {
|
||||
val attachment = GeneratedAttachment(input.readBytesWithLength())
|
||||
return ContractAttachment(attachment, input.readString())
|
||||
if (kryo.serializationContext() != null) {
|
||||
val attachmentHash = SecureHash.SHA256(input.readBytes(32))
|
||||
val contract = input.readString()
|
||||
|
||||
val context = kryo.serializationContext()!!
|
||||
val attachmentStorage = context.serviceHub.attachments
|
||||
|
||||
val lazyAttachment = object : AbstractAttachment({
|
||||
val attachment = attachmentStorage.openAttachment(attachmentHash) ?: throw MissingAttachmentsException(listOf(attachmentHash))
|
||||
attachment.open().readBytes()
|
||||
}) {
|
||||
override val id = attachmentHash
|
||||
}
|
||||
|
||||
return ContractAttachment(lazyAttachment, contract)
|
||||
} else {
|
||||
val attachment = GeneratedAttachment(input.readBytesWithLength())
|
||||
val contract = input.readString()
|
||||
|
||||
return ContractAttachment(attachment, contract)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -18,6 +18,7 @@ import net.corda.core.serialization.*
|
||||
import net.corda.core.internal.LazyPool
|
||||
import net.corda.nodeapi.internal.serialization.CordaClassResolver
|
||||
import net.corda.nodeapi.internal.serialization.SerializationScheme
|
||||
import java.security.PublicKey
|
||||
|
||||
// "corda" + majorVersionByte + minorVersionMSB + minorVersionLSB
|
||||
val KryoHeaderV0_1: OpaqueBytes = OpaqueBytes("corda\u0000\u0000\u0001".toByteArray(Charsets.UTF_8))
|
||||
@ -39,6 +40,9 @@ abstract class AbstractKryoSerializationScheme : SerializationScheme {
|
||||
protected abstract fun rpcClientKryoPool(context: SerializationContext): KryoPool
|
||||
protected abstract fun rpcServerKryoPool(context: SerializationContext): KryoPool
|
||||
|
||||
// this can be overriden in derived serialization schemes
|
||||
open protected val publicKeySerializer: Serializer<PublicKey> = PublicKeySerializer
|
||||
|
||||
private fun getPool(context: SerializationContext): KryoPool {
|
||||
return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
||||
when (context.useCase) {
|
||||
@ -50,6 +54,7 @@ abstract class AbstractKryoSerializationScheme : SerializationScheme {
|
||||
val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true }
|
||||
serializer.kryo.apply {
|
||||
field.set(this, classResolver)
|
||||
// don't allow overriding the public key serializer for checkpointing
|
||||
DefaultKryoCustomizer.customize(this)
|
||||
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
|
||||
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
|
||||
@ -62,7 +67,7 @@ abstract class AbstractKryoSerializationScheme : SerializationScheme {
|
||||
rpcServerKryoPool(context)
|
||||
else ->
|
||||
KryoPool.Builder {
|
||||
DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(context))).apply { classLoader = it.second }
|
||||
DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(context)), publicKeySerializer).apply { classLoader = it.second }
|
||||
}.build()
|
||||
}
|
||||
}
|
||||
|
@ -20,12 +20,12 @@ import static org.assertj.core.api.ThrowableAssert.catchThrowable;
|
||||
|
||||
public final class ForbiddenLambdaSerializationTests {
|
||||
@Rule
|
||||
public SerializationEnvironmentRule testSerialization = new SerializationEnvironmentRule();
|
||||
public final SerializationEnvironmentRule testSerialization = new SerializationEnvironmentRule();
|
||||
private SerializationFactory factory;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
factory = testSerialization.env.getSERIALIZATION_FACTORY();
|
||||
factory = testSerialization.getEnv().getSerializationFactory();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -19,13 +19,13 @@ import static org.assertj.core.api.ThrowableAssert.catchThrowable;
|
||||
|
||||
public final class LambdaCheckpointSerializationTest {
|
||||
@Rule
|
||||
public SerializationEnvironmentRule testSerialization = new SerializationEnvironmentRule();
|
||||
public final SerializationEnvironmentRule testSerialization = new SerializationEnvironmentRule();
|
||||
private SerializationFactory factory;
|
||||
private SerializationContext context;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
factory = testSerialization.env.getSERIALIZATION_FACTORY();
|
||||
factory = testSerialization.getEnv().getSerializationFactory();
|
||||
context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, SerializationContext.UseCase.Checkpoint);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,102 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
|
||||
import net.corda.core.contracts.ContractAttachment
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class ContractAttachmentSerializerTest {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
private lateinit var factory: SerializationFactory
|
||||
private lateinit var context: SerializationContext
|
||||
private lateinit var contextWithToken: SerializationContext
|
||||
|
||||
private val mockServices = MockServices()
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
factory = testSerialization.env.serializationFactory
|
||||
context = testSerialization.env.checkpointContext
|
||||
contextWithToken = context.withTokenContext(SerializeAsTokenContextImpl(Any(), factory, context, mockServices))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `write contract attachment and read it back`() {
|
||||
val contractAttachment = ContractAttachment(GeneratedAttachment(ByteArray(0)), DummyContract.PROGRAM_ID)
|
||||
// no token context so will serialize the whole attachment
|
||||
val serialized = contractAttachment.serialize(factory, context)
|
||||
val deserialized = serialized.deserialize(factory, context)
|
||||
|
||||
assertEquals(contractAttachment.id, deserialized.attachment.id)
|
||||
assertEquals(contractAttachment.contract, deserialized.contract)
|
||||
assertArrayEquals(contractAttachment.open().readBytes(), deserialized.open().readBytes())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `write contract attachment and read it back using token context`() {
|
||||
val attachment = GeneratedAttachment("test".toByteArray())
|
||||
|
||||
mockServices.attachments.importAttachment(attachment.open())
|
||||
|
||||
val contractAttachment = ContractAttachment(attachment, DummyContract.PROGRAM_ID)
|
||||
val serialized = contractAttachment.serialize(factory, contextWithToken)
|
||||
val deserialized = serialized.deserialize(factory, contextWithToken)
|
||||
|
||||
assertEquals(contractAttachment.id, deserialized.attachment.id)
|
||||
assertEquals(contractAttachment.contract, deserialized.contract)
|
||||
assertArrayEquals(contractAttachment.open().readBytes(), deserialized.open().readBytes())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check only serialize attachment id and contract class name when using token context`() {
|
||||
val largeAttachmentSize = 1024 * 1024
|
||||
val attachment = GeneratedAttachment(ByteArray(largeAttachmentSize))
|
||||
|
||||
mockServices.attachments.importAttachment(attachment.open())
|
||||
|
||||
val contractAttachment = ContractAttachment(attachment, DummyContract.PROGRAM_ID)
|
||||
val serialized = contractAttachment.serialize(factory, contextWithToken)
|
||||
|
||||
assertThat(serialized.size).isLessThan(largeAttachmentSize)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `throws when missing attachment when using token context`() {
|
||||
val attachment = GeneratedAttachment("test".toByteArray())
|
||||
|
||||
// don't importAttachment in mockService
|
||||
|
||||
val contractAttachment = ContractAttachment(attachment, DummyContract.PROGRAM_ID)
|
||||
val serialized = contractAttachment.serialize(factory, contextWithToken)
|
||||
val deserialized = serialized.deserialize(factory, contextWithToken)
|
||||
|
||||
assertThatThrownBy { deserialized.attachment.open() }.isInstanceOf(MissingAttachmentsException::class.java)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check attachment in deserialize is lazy loaded when using token context`() {
|
||||
val attachment = GeneratedAttachment(ByteArray(0))
|
||||
|
||||
// don't importAttachment in mockService
|
||||
|
||||
val contractAttachment = ContractAttachment(attachment, DummyContract.PROGRAM_ID)
|
||||
val serialized = contractAttachment.serialize(factory, contextWithToken)
|
||||
serialized.deserialize(factory, contextWithToken)
|
||||
|
||||
// MissingAttachmentsException thrown if we try to open attachment
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,6 @@ import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.nodeapi.internal.serialization.kryo.CordaKryo
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.TestDependencyInjectionBase
|
||||
import net.corda.testing.rigorousMock
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
@ -26,8 +25,8 @@ class SerializationTokenTest {
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
factory = testSerialization.env.SERIALIZATION_FACTORY
|
||||
context = testSerialization.env.CHECKPOINT_CONTEXT.withWhitelisted(SingletonSerializationToken::class.java)
|
||||
factory = testSerialization.env.serializationFactory
|
||||
context = testSerialization.env.checkpointContext.withWhitelisted(SingletonSerializationToken::class.java)
|
||||
}
|
||||
|
||||
// Large tokenizable object so we can tell from the smaller number of serialized bytes it was actually tokenized
|
||||
|
@ -0,0 +1,65 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||
import org.apache.qpid.proton.codec.Data
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.junit.Test
|
||||
import java.lang.reflect.Type
|
||||
import java.security.PublicKey
|
||||
|
||||
class OverridePKSerializerTest {
|
||||
class SerializerTestException(message: String) : Exception(message)
|
||||
|
||||
class TestPublicKeySerializer : CustomSerializer.Implements<PublicKey>(PublicKey::class.java) {
|
||||
override fun writeDescribedObject(obj: PublicKey, data: Data, type: Type, output: SerializationOutput) {
|
||||
throw SerializerTestException("Custom write call")
|
||||
}
|
||||
|
||||
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): PublicKey {
|
||||
throw SerializerTestException("Custom read call")
|
||||
}
|
||||
|
||||
override val schemaForDocumentation: Schema
|
||||
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme() {
|
||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean = true
|
||||
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override val publicKeySerializer = TestPublicKeySerializer()
|
||||
}
|
||||
|
||||
class TestPublicKey : PublicKey {
|
||||
override fun getAlgorithm(): String {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun getEncoded(): ByteArray {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun getFormat(): String {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test publicKeySerializer is overridden`() {
|
||||
val scheme = AMQPTestSerializationScheme()
|
||||
val key = TestPublicKey()
|
||||
|
||||
Assertions
|
||||
.assertThatThrownBy { scheme.serialize(key, AMQP_P2P_CONTEXT) }
|
||||
.hasMessageMatching("Custom write call")
|
||||
}
|
||||
}
|
@ -1,10 +1,6 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.identity.AbstractParty
|
||||
@ -13,19 +9,26 @@ import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializationFactory
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.internal.AbstractAttachment
|
||||
import net.corda.core.serialization.MissingAttachmentsException
|
||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.EmptyWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.GeneratedAttachment
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.isPrimitive
|
||||
import net.corda.testing.BOB_IDENTITY
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.MEGA_CORP_PUBKEY
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.withTestSerialization
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.qpid.proton.amqp.*
|
||||
import org.apache.qpid.proton.codec.DecoderImpl
|
||||
import org.apache.qpid.proton.codec.EncoderImpl
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Assert.assertNotSame
|
||||
import org.junit.Assert.assertSame
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.io.ByteArrayInputStream
|
||||
@ -36,6 +39,10 @@ import java.nio.ByteBuffer
|
||||
import java.time.*
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.*
|
||||
import kotlin.reflect.full.declaredFunctions
|
||||
import kotlin.reflect.full.declaredMemberFunctions
|
||||
import kotlin.reflect.full.superclasses
|
||||
import kotlin.reflect.jvm.javaMethod
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotNull
|
||||
import kotlin.test.assertTrue
|
||||
@ -557,11 +564,16 @@ class SerializationOutputTests {
|
||||
fun `test transaction state`() {
|
||||
val state = TransactionState(FooState(), FOO_PROGRAM_ID, MEGA_CORP)
|
||||
|
||||
val scheme = AMQPServerSerializationScheme()
|
||||
val func = scheme::class.superclasses.single { it.simpleName == "AbstractAMQPSerializationScheme" }
|
||||
.java.getDeclaredMethod("registerCustomSerializers", SerializerFactory::class.java)
|
||||
func.isAccessible = true
|
||||
|
||||
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
AbstractAMQPSerializationScheme.registerCustomSerializers(factory)
|
||||
func.invoke(scheme, factory)
|
||||
|
||||
val factory2 = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
AbstractAMQPSerializationScheme.registerCustomSerializers(factory2)
|
||||
func.invoke(scheme, factory2)
|
||||
|
||||
val desState = serdes(state, factory, factory2, expectedEqual = false, expectDeserializedEqual = false)
|
||||
assertTrue((desState as TransactionState<*>).data is FooState)
|
||||
@ -984,4 +996,36 @@ class SerializationOutputTests {
|
||||
obj[Month.AUGUST] = Month.AUGUST.value
|
||||
serdes(obj)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test contract attachment serialize`() {
|
||||
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
factory.register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(factory))
|
||||
|
||||
val factory2 = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
factory2.register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(factory2))
|
||||
|
||||
val obj = ContractAttachment(GeneratedAttachment("test".toByteArray()), DummyContract.PROGRAM_ID)
|
||||
val obj2 = serdes(obj, factory, factory2, expectedEqual = false, expectDeserializedEqual = false)
|
||||
assertEquals(obj.id, obj2.attachment.id)
|
||||
assertEquals(obj.contract, obj2.contract)
|
||||
assertArrayEquals(obj.open().readBytes(), obj2.open().readBytes())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `test contract attachment throws if missing attachment`() {
|
||||
val factory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
factory.register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(factory))
|
||||
|
||||
val factory2 = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())
|
||||
factory2.register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(factory2))
|
||||
|
||||
val obj = ContractAttachment(object : AbstractAttachment({ throw Exception() }) {
|
||||
override val id = SecureHash.zeroHash
|
||||
}, DummyContract.PROGRAM_ID)
|
||||
|
||||
assertThatThrownBy {
|
||||
serdes(obj, factory, factory2, expectedEqual = false, expectDeserializedEqual = false)
|
||||
}.isInstanceOf(MissingAttachmentsException::class.java)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user