mirror of
https://github.com/corda/corda.git
synced 2025-06-17 22:58:19 +00:00
CORDA-973 Refactoring for serialization compression support (#2466)
* Use constant for empty byte array * Less byte array copying * Fix InputStreamSerializer trailing garbage * More OO kryo streams * Introduce SerializationMagic * Introduce non-copying slice on ByteSequence
This commit is contained in:
@ -2,16 +2,16 @@ package net.corda.node.serialization
|
||||
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||
import net.corda.node.services.messaging.RpcServerObservableSerializer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||
import net.corda.nodeapi.internal.serialization.kryo.RPCKryo
|
||||
|
||||
class KryoServerSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
|
||||
return byteSequence == KryoHeaderV0_1 && target != SerializationContext.UseCase.RPCClient
|
||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||
return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient
|
||||
}
|
||||
|
||||
override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
@ -16,6 +16,7 @@ import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import java.security.InvalidAlgorithmParameterException
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.*
|
||||
@ -73,7 +74,7 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
|
||||
|
||||
@Lob
|
||||
@Column(name = "identity_value")
|
||||
var identity: ByteArray = ByteArray(0)
|
||||
var identity: ByteArray = EMPTY_BYTE_ARRAY
|
||||
)
|
||||
|
||||
@Entity
|
||||
|
@ -8,6 +8,7 @@ import net.corda.core.utilities.MAX_HASH_HEX_SIZE
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.bouncycastle.operator.ContentSigner
|
||||
import java.security.KeyPair
|
||||
import java.security.PrivateKey
|
||||
@ -37,11 +38,10 @@ class PersistentKeyManagementService(val identityService: IdentityServiceInterna
|
||||
|
||||
@Lob
|
||||
@Column(name = "public_key")
|
||||
var publicKey: ByteArray = ByteArray(0),
|
||||
|
||||
var publicKey: ByteArray = EMPTY_BYTE_ARRAY,
|
||||
@Lob
|
||||
@Column(name = "private_key")
|
||||
var privateKey: ByteArray = ByteArray(0)
|
||||
var privateKey: ByteArray = EMPTY_BYTE_ARRAY
|
||||
) {
|
||||
constructor(publicKey: PublicKey, privateKey: PrivateKey)
|
||||
: this(publicKey.toStringShort(), publicKey.encoded, privateKey.encoded)
|
||||
|
@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import rx.Subscription
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
@ -196,11 +197,10 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
|
||||
@Lob
|
||||
@Column
|
||||
var message: ByteArray = ByteArray(0),
|
||||
|
||||
var message: ByteArray = EMPTY_BYTE_ARRAY,
|
||||
@Lob
|
||||
@Column
|
||||
var recipients: ByteArray = ByteArray(0)
|
||||
var recipients: ByteArray = EMPTY_BYTE_ARRAY
|
||||
)
|
||||
|
||||
fun start() {
|
||||
|
@ -5,6 +5,7 @@ import net.corda.node.services.api.Checkpoint
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
@ -24,13 +25,13 @@ class DBCheckpointStorage : CheckpointStorage {
|
||||
|
||||
@Lob
|
||||
@Column(name = "checkpoint_value")
|
||||
var checkpoint: ByteArray = ByteArray(0)
|
||||
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY
|
||||
)
|
||||
|
||||
override fun addCheckpoint(checkpoint: Checkpoint) {
|
||||
currentDBSession().save(DBCheckpoint().apply {
|
||||
checkpointId = checkpoint.id.toString()
|
||||
this.checkpoint = checkpoint.serializedFiber.bytes
|
||||
this.checkpoint = checkpoint.serializedFiber.bytes // XXX: Is copying the byte array necessary?
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@ import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.*
|
||||
@ -34,7 +35,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
|
||||
|
||||
@Lob
|
||||
@Column(name = "transaction_value")
|
||||
var transaction: ByteArray = ByteArray(0)
|
||||
var transaction: ByteArray = EMPTY_BYTE_ARRAY
|
||||
)
|
||||
|
||||
private companion object {
|
||||
|
@ -13,6 +13,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.hibernate.annotations.Type
|
||||
import java.io.Serializable
|
||||
import java.util.*
|
||||
@ -45,7 +46,7 @@ class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsTok
|
||||
|
||||
@Column(name = "requesting_party_key", length = 255)
|
||||
@Type(type = "corda-wrapper-binary")
|
||||
var owningKey: ByteArray = ByteArray(0)
|
||||
var owningKey: ByteArray = EMPTY_BYTE_ARRAY
|
||||
) : Serializable
|
||||
|
||||
@Entity
|
||||
|
@ -33,6 +33,7 @@ import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -76,8 +77,7 @@ class RaftUniquenessProvider(private val transportConfiguration: NodeSSLConfigur
|
||||
|
||||
@Lob
|
||||
@Column(name = "state_value")
|
||||
var value: ByteArray = ByteArray(0),
|
||||
|
||||
var value: ByteArray = EMPTY_BYTE_ARRAY,
|
||||
@Column(name = "state_index")
|
||||
var index: Long = 0
|
||||
)
|
||||
|
@ -6,6 +6,7 @@ import net.corda.node.services.messaging.TopicStringValidator
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -93,9 +94,8 @@ class InMemoryMessagingTests {
|
||||
node1.network.addMessageHandler("valid_message") { _, _ ->
|
||||
received++
|
||||
}
|
||||
|
||||
val invalidMessage = node2.network.createMessage("invalid_message", data = ByteArray(0))
|
||||
val validMessage = node2.network.createMessage("valid_message", data = ByteArray(0))
|
||||
val invalidMessage = node2.network.createMessage("invalid_message", data = EMPTY_BYTE_ARRAY)
|
||||
val validMessage = node2.network.createMessage("valid_message", data = EMPTY_BYTE_ARRAY)
|
||||
node2.network.send(invalidMessage, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
assertEquals(0, received)
|
||||
@ -106,8 +106,8 @@ class InMemoryMessagingTests {
|
||||
|
||||
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
|
||||
// this would fail. Make fresh messages to stop duplicate uniqueMessageId causing drops
|
||||
val invalidMessage2 = node2.network.createMessage("invalid_message", data = ByteArray(0))
|
||||
val validMessage2 = node2.network.createMessage("valid_message", data = ByteArray(0))
|
||||
val invalidMessage2 = node2.network.createMessage("invalid_message", data = EMPTY_BYTE_ARRAY)
|
||||
val validMessage2 = node2.network.createMessage("valid_message", data = EMPTY_BYTE_ARRAY)
|
||||
node2.network.send(invalidMessage2, node1.network.myAddress)
|
||||
node2.network.send(validMessage2, node1.network.myAddress)
|
||||
mockNet.runNetwork()
|
||||
|
Reference in New Issue
Block a user