mirror of
https://github.com/corda/corda.git
synced 2025-03-16 00:55:24 +00:00
Merge pull request #1349 from corda/mike-merge-2fae95c58f9
Merge to 2fae95c58f9
This commit is contained in:
commit
153519e72e
@ -5,14 +5,15 @@ import java.security.Provider
|
||||
import java.security.SecureRandom
|
||||
import java.security.SecureRandomSpi
|
||||
import java.security.Security
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/**
|
||||
* Temporarily restore Sun's [SecureRandom] provider.
|
||||
* This is ONLY for allowing us to generate test data, e.g. signatures.
|
||||
*/
|
||||
class CheatingSecurityProvider : Provider(NAME, 1.8, "$NAME security provider"), AutoCloseable {
|
||||
class CheatingSecurityProvider : Provider("Cheat-${counter.getAndIncrement()}", 1.8, "Cheat security provider"), AutoCloseable {
|
||||
private companion object {
|
||||
private const val NAME = "Cheat!"
|
||||
private val counter = AtomicInteger()
|
||||
}
|
||||
|
||||
init {
|
||||
@ -21,7 +22,7 @@ class CheatingSecurityProvider : Provider(NAME, 1.8, "$NAME security provider"),
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
Security.removeProvider(NAME)
|
||||
Security.removeProvider(name)
|
||||
}
|
||||
|
||||
private class SunSecureRandom : SecureRandom(sun.security.provider.SecureRandom(), null)
|
||||
|
@ -208,6 +208,11 @@ interface SerializationContext {
|
||||
*/
|
||||
fun withEncoding(encoding: SerializationEncoding?): SerializationContext
|
||||
|
||||
/**
|
||||
* A shallow copy of this context but with the given encoding whitelist.
|
||||
*/
|
||||
fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist): SerializationContext
|
||||
|
||||
/**
|
||||
* The use case that we are serializing for, since it influences the implementations chosen.
|
||||
*/
|
||||
|
@ -11,23 +11,36 @@
|
||||
package net.corda.core.cordapp
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.Crypto.generateKeyPair
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.copyToDirectory
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA
|
||||
import net.corda.nodeapi.internal.DEV_ROOT_CA
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.createDevNodeCa
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.smoketesting.NodeConfig
|
||||
import net.corda.smoketesting.NodeProcess
|
||||
import net.corda.smoketesting.NodeProcess.Companion.CORDAPPS_DIR_NAME
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.security.KeyPair
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.streams.toList
|
||||
|
||||
@ -50,13 +63,21 @@ class CordappSmokeTest {
|
||||
|
||||
@Test
|
||||
fun `FlowContent appName returns the filename of the CorDapp jar`() {
|
||||
val cordappsDir = (factory.baseDirectory(aliceConfig) / CORDAPPS_DIR_NAME).createDirectories()
|
||||
val baseDir = factory.baseDirectory(aliceConfig)
|
||||
val cordappsDir = (baseDir / CORDAPPS_DIR_NAME).createDirectories()
|
||||
// Find the jar file for the smoke tests of this module
|
||||
val selfCordapp = Paths.get("build", "libs").list {
|
||||
it.filter { "-smokeTests" in it.toString() }.toList().single()
|
||||
}
|
||||
selfCordapp.copyToDirectory(cordappsDir)
|
||||
|
||||
// The `nodeReadyFuture` in the persistent network map cache will not complete unless there is at least one other
|
||||
// node in the network. We work around this limitation by putting another node info file in the additional-node-info
|
||||
// folder.
|
||||
// TODO clean this up after we refactor the persistent network map cache / network map updater
|
||||
val additionalNodeInfoDir = (baseDir / "additional-node-infos").createDirectories()
|
||||
createDummyNodeInfo(additionalNodeInfoDir)
|
||||
|
||||
factory.create(aliceConfig).use { alice ->
|
||||
alice.connect().use { connectionToAlice ->
|
||||
val aliceIdentity = connectionToAlice.proxy.nodeInfo().legalIdentitiesAndCerts.first().party
|
||||
@ -100,4 +121,39 @@ class CordappSmokeTest {
|
||||
otherPartySession.send(sessionInitContext)
|
||||
}
|
||||
}
|
||||
|
||||
private fun createDummyNodeInfo(additionalNodeInfoDir: Path) {
|
||||
val dummyKeyPair = generateKeyPair()
|
||||
val nodeInfo = createNodeInfoWithSingleIdentity(CordaX500Name(organisation = "Bob Corp", locality = "Madrid", country = "ES"), dummyKeyPair, dummyKeyPair.public)
|
||||
val signedNodeInfo = signWith(nodeInfo, listOf(dummyKeyPair.private))
|
||||
(additionalNodeInfoDir / "nodeInfo-41408E093F95EAD51F6892C34DEB65AE1A3569A4B0E5744769A1B485AF8E04B5").write(signedNodeInfo.serialize().bytes)
|
||||
}
|
||||
|
||||
private fun createNodeInfoWithSingleIdentity(name: CordaX500Name, nodeKeyPair: KeyPair, identityCertPublicKey: PublicKey): NodeInfo {
|
||||
val nodeCertificateAndKeyPair = createDevNodeCa(DEV_INTERMEDIATE_CA, name, nodeKeyPair)
|
||||
val identityCert = X509Utilities.createCertificate(
|
||||
CertificateType.LEGAL_IDENTITY,
|
||||
nodeCertificateAndKeyPair.certificate,
|
||||
nodeCertificateAndKeyPair.keyPair,
|
||||
nodeCertificateAndKeyPair.certificate.subjectX500Principal,
|
||||
identityCertPublicKey)
|
||||
val certPath = X509Utilities.buildCertPath(
|
||||
identityCert,
|
||||
nodeCertificateAndKeyPair.certificate,
|
||||
DEV_INTERMEDIATE_CA.certificate,
|
||||
DEV_ROOT_CA.certificate)
|
||||
val partyAndCertificate = PartyAndCertificate(certPath)
|
||||
return NodeInfo(
|
||||
listOf(NetworkHostAndPort("my.${partyAndCertificate.party.name.organisation}.com", 1234)),
|
||||
listOf(partyAndCertificate),
|
||||
1,
|
||||
1
|
||||
)
|
||||
}
|
||||
|
||||
private fun signWith(nodeInfo: NodeInfo, keys: List<PrivateKey>): SignedNodeInfo {
|
||||
val serialized = nodeInfo.serialize()
|
||||
val signatures = keys.map { it.sign(serialized.bytes) }
|
||||
return SignedNodeInfo(serialized, signatures)
|
||||
}
|
||||
}
|
||||
|
@ -1,69 +0,0 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.BFTNotaryServiceTests.Companion.startBftClusterAndNode
|
||||
import net.corda.node.services.transactions.minClusterSize
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
|
||||
class BFTSMaRtTests : IntegrationTest() {
|
||||
companion object {
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas("node_0", "node_1", "node_2", "node_3", "node_4", "node_5",
|
||||
"node_6", "node_7", "node_8", "node_9")
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"))
|
||||
}
|
||||
|
||||
@After
|
||||
fun stopNodes() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
/** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */
|
||||
@Test
|
||||
fun `all replicas start even if there is a new consensus during startup`() {
|
||||
val clusterSize = minClusterSize(1)
|
||||
val (notary, node) = startBftClusterAndNode(clusterSize, mockNet, exposeRaces = true) // This true adds a sleep to expose the race.
|
||||
val f = node.run {
|
||||
val trivialTx = signInitialTransaction(notary) {
|
||||
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||
}
|
||||
// Create a new consensus while the redundant replica is sleeping:
|
||||
services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture
|
||||
}
|
||||
mockNet.runNetwork()
|
||||
f.getOrThrow()
|
||||
}
|
||||
|
||||
private fun TestStartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction {
|
||||
return services.signInitialTransaction(
|
||||
TransactionBuilder(notary).apply {
|
||||
addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey))
|
||||
block()
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
@ -146,7 +146,7 @@ open class NodeStartup(val args: Array<String>) {
|
||||
|
||||
private fun Exception.logAsExpected(message: String? = this.message, print: (String?) -> Unit = logger::error) = print("$message [errorCode=${errorCode()}]")
|
||||
|
||||
private fun Exception.logAsUnexpected(message: String? = this.message, error: Exception = this, print: (String?, Throwable) -> Unit = logger::error) = print("$message [errorCode=${errorCode()}]", error)
|
||||
private fun Exception.logAsUnexpected(message: String? = this.message, error: Exception = this, print: (String?, Throwable) -> Unit = logger::error) = print("$message${this.message?.let { ": $it" } ?: ""} [errorCode=${errorCode()}]", error)
|
||||
|
||||
private fun Exception.isOpenJdkKnownIssue() = message?.startsWith("Unknown named curve:") == true
|
||||
|
||||
|
@ -54,7 +54,6 @@ class TestScheme : AbstractKryoSerializationScheme() {
|
||||
override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
||||
override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException()
|
||||
|
||||
}
|
||||
|
||||
@RunWith(Parameterized::class)
|
||||
@ -99,7 +98,6 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
|
||||
assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null))
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun `serialised form is stable when the same object instance is added to the deserialised object graph`() {
|
||||
val noReferencesContext = context.withoutReferences()
|
||||
@ -366,4 +364,16 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
|
||||
assertEquals(encodingNotPermittedFormat.format(compression), message)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `compression reduces number of bytes significantly`() {
|
||||
class Holder(val holder: ByteArray)
|
||||
|
||||
val obj = Holder(ByteArray(20000))
|
||||
val uncompressedSize = obj.serialize(factory, context.withEncoding(null)).size
|
||||
val compressedSize = obj.serialize(factory, context.withEncoding(CordaSerializationEncoding.SNAPPY)).size
|
||||
// If these need fixing, sounds like Kryo wire format changed and checkpoints might not surive an upgrade.
|
||||
assertEquals(20222, uncompressedSize)
|
||||
assertEquals(1111, compressedSize)
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.serialization.internal.OrdinalBits.OrdinalWriter
|
||||
import org.iq80.snappy.SnappyFramedInputStream
|
||||
import org.iq80.snappy.SnappyFramedOutputStream
|
||||
import java.io.IOException
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
import java.nio.ByteBuffer
|
||||
@ -54,7 +55,7 @@ enum class CordaSerializationEncoding : SerializationEncoding, OrdinalWriter {
|
||||
override fun wrap(stream: InputStream) = InflaterInputStream(stream)
|
||||
},
|
||||
SNAPPY {
|
||||
override fun wrap(stream: OutputStream) = SnappyFramedOutputStream(stream)
|
||||
override fun wrap(stream: OutputStream) = FlushAverseOutputStream(SnappyFramedOutputStream(stream))
|
||||
override fun wrap(stream: InputStream) = SnappyFramedInputStream(stream, false)
|
||||
};
|
||||
|
||||
@ -68,3 +69,21 @@ enum class CordaSerializationEncoding : SerializationEncoding, OrdinalWriter {
|
||||
}
|
||||
|
||||
const val encodingNotPermittedFormat = "Encoding not permitted: %s"
|
||||
|
||||
/**
|
||||
* Has an empty flush implementation. This is because Kryo keeps calling flush all the time, which stops the Snappy
|
||||
* stream from building up big chunks to compress and instead keeps compressing small chunks giving terrible compression ratio.
|
||||
*/
|
||||
class FlushAverseOutputStream(private val delegate: OutputStream) : OutputStream() {
|
||||
@Throws(IOException::class)
|
||||
override fun write(b: Int) = delegate.write(b)
|
||||
|
||||
@Throws(IOException::class)
|
||||
override fun write(b: ByteArray?, off: Int, len: Int) = delegate.write(b, off, len)
|
||||
|
||||
@Throws(IOException::class)
|
||||
override fun close() {
|
||||
delegate.flush()
|
||||
delegate.close()
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +77,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
|
||||
|
||||
override fun withPreferredSerializationVersion(magic: SerializationMagic) = copy(preferredSerializationVersion = magic)
|
||||
override fun withEncoding(encoding: SerializationEncoding?) = copy(encoding = encoding)
|
||||
override fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist) = copy(encodingWhitelist = encodingWhitelist)
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -39,9 +39,8 @@ data class ObjectAndEnvelope<out T>(val obj: T, val envelope: Envelope)
|
||||
* instances and threads.
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class DeserializationInput @JvmOverloads constructor(
|
||||
private val serializerFactory: SerializerFactory,
|
||||
private val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist
|
||||
class DeserializationInput constructor(
|
||||
private val serializerFactory: SerializerFactory
|
||||
) {
|
||||
private val objectHistory: MutableList<Any> = mutableListOf()
|
||||
private val logger = loggerFor<DeserializationInput>()
|
||||
@ -90,9 +89,9 @@ class DeserializationInput @JvmOverloads constructor(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
@Throws(AMQPNoTypeNotSerializableException::class)
|
||||
fun getEnvelope(byteSequence: ByteSequence) = getEnvelope(byteSequence, encodingWhitelist)
|
||||
fun getEnvelope(byteSequence: ByteSequence, context: SerializationContext) = getEnvelope(byteSequence, context.encodingWhitelist)
|
||||
|
||||
@Throws(
|
||||
AMQPNotSerializableException::class,
|
||||
@ -126,7 +125,7 @@ class DeserializationInput @JvmOverloads constructor(
|
||||
@Throws(NotSerializableException::class)
|
||||
fun <T : Any> deserialize(bytes: ByteSequence, clazz: Class<T>, context: SerializationContext): T =
|
||||
des {
|
||||
val envelope = getEnvelope(bytes, encodingWhitelist)
|
||||
val envelope = getEnvelope(bytes, context.encodingWhitelist)
|
||||
|
||||
logger.trace("deserialize blob scheme=\"${envelope.schema.toString()}\"")
|
||||
|
||||
@ -140,7 +139,7 @@ class DeserializationInput @JvmOverloads constructor(
|
||||
clazz: Class<T>,
|
||||
context: SerializationContext
|
||||
): ObjectAndEnvelope<T> = des {
|
||||
val envelope = getEnvelope(bytes, encodingWhitelist)
|
||||
val envelope = getEnvelope(bytes, context.encodingWhitelist)
|
||||
// Now pick out the obj and schema from the envelope.
|
||||
ObjectAndEnvelope(
|
||||
clazz.cast(readObjectOrNull(
|
||||
|
@ -12,7 +12,6 @@ package net.corda.serialization.internal.amqp
|
||||
|
||||
import net.corda.core.KeepForDJVM
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationEncoding
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.serialization.internal.CordaSerializationEncoding
|
||||
@ -38,9 +37,8 @@ data class BytesAndSchemas<T : Any>(
|
||||
* instances and threads.
|
||||
*/
|
||||
@KeepForDJVM
|
||||
open class SerializationOutput @JvmOverloads constructor(
|
||||
internal val serializerFactory: SerializerFactory,
|
||||
private val encoding: SerializationEncoding? = null
|
||||
open class SerializationOutput constructor(
|
||||
internal val serializerFactory: SerializerFactory
|
||||
) {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
@ -100,6 +98,7 @@ open class SerializationOutput @JvmOverloads constructor(
|
||||
var stream: OutputStream = it
|
||||
try {
|
||||
amqpMagic.writeTo(stream)
|
||||
val encoding = context.encoding
|
||||
if (encoding != null) {
|
||||
SectionId.ENCODING.writeTo(stream)
|
||||
(encoding as CordaSerializationEncoding).writeTo(stream)
|
||||
|
@ -18,9 +18,9 @@ import net.corda.node.services.statemachine.DataSessionMessage
|
||||
import net.corda.serialization.internal.amqp.DeserializationInput
|
||||
import net.corda.serialization.internal.amqp.Envelope
|
||||
import net.corda.serialization.internal.amqp.SerializerFactory
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.internal.amqpSpecific
|
||||
import net.corda.testing.internal.kryoSpecific
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Assert.assertEquals
|
||||
@ -38,7 +38,7 @@ class ListsSerializationTest {
|
||||
fun <T : Any> verifyEnvelope(serBytes: SerializedBytes<T>, envVerBody: (Envelope) -> Unit) =
|
||||
amqpSpecific("AMQP specific envelope verification") {
|
||||
val context = SerializationFactory.defaultFactory.defaultContext
|
||||
val envelope = DeserializationInput(SerializerFactory(context.whitelist, context.deserializationClassLoader)).getEnvelope(serBytes)
|
||||
val envelope = DeserializationInput(SerializerFactory(context.whitelist, context.deserializationClassLoader)).getEnvelope(serBytes, context)
|
||||
envVerBody(envelope)
|
||||
}
|
||||
}
|
||||
|
@ -229,8 +229,8 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
freshDeserializationFactory: SerializerFactory = defaultFactory(),
|
||||
expectedEqual: Boolean = true,
|
||||
expectDeserializedEqual: Boolean = true): T {
|
||||
val ser = SerializationOutput(factory, compression)
|
||||
val bytes = ser.serialize(obj)
|
||||
val ser = SerializationOutput(factory)
|
||||
val bytes = ser.serialize(obj, compression)
|
||||
|
||||
val decoder = DecoderImpl().apply {
|
||||
this.register(Envelope.DESCRIPTOR, Envelope.Companion)
|
||||
@ -251,14 +251,14 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
val result = decoder.readObject() as Envelope
|
||||
assertNotNull(result)
|
||||
}
|
||||
val des = DeserializationInput(freshDeserializationFactory, encodingWhitelist)
|
||||
val desObj = des.deserialize(bytes)
|
||||
val des = DeserializationInput(freshDeserializationFactory)
|
||||
val desObj = des.deserialize(bytes, testSerializationContext.withEncodingWhitelist(encodingWhitelist))
|
||||
assertTrue(Objects.deepEquals(obj, desObj) == expectedEqual)
|
||||
|
||||
// Now repeat with a re-used factory
|
||||
val ser2 = SerializationOutput(factory, compression)
|
||||
val des2 = DeserializationInput(factory, encodingWhitelist)
|
||||
val desObj2 = des2.deserialize(ser2.serialize(obj))
|
||||
val ser2 = SerializationOutput(factory)
|
||||
val des2 = DeserializationInput(factory)
|
||||
val desObj2 = des2.deserialize(ser2.serialize(obj, compression), testSerializationContext.withEncodingWhitelist(encodingWhitelist))
|
||||
assertTrue(Objects.deepEquals(obj, desObj2) == expectedEqual)
|
||||
assertTrue(Objects.deepEquals(desObj, desObj2) == expectDeserializedEqual)
|
||||
|
||||
@ -481,10 +481,10 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
@Test
|
||||
fun `class constructor is invoked on deserialisation`() {
|
||||
compression == null || return // Manipulation of serialized bytes is invalid if they're compressed.
|
||||
val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()), compression)
|
||||
val des = DeserializationInput(ser.serializerFactory, encodingWhitelist)
|
||||
val serialisedOne = ser.serialize(NonZeroByte(1)).bytes
|
||||
val serialisedTwo = ser.serialize(NonZeroByte(2)).bytes
|
||||
val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()))
|
||||
val des = DeserializationInput(ser.serializerFactory)
|
||||
val serialisedOne = ser.serialize(NonZeroByte(1), compression).bytes
|
||||
val serialisedTwo = ser.serialize(NonZeroByte(2), compression).bytes
|
||||
|
||||
// Find the index that holds the value byte
|
||||
val valueIndex = serialisedOne.zip(serialisedTwo).mapIndexedNotNull { index, (oneByte, twoByte) ->
|
||||
@ -495,12 +495,12 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
|
||||
// Double check
|
||||
copy[valueIndex] = 0x03
|
||||
assertThat(des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext).value).isEqualTo(3)
|
||||
assertThat(des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext.withEncodingWhitelist(encodingWhitelist)).value).isEqualTo(3)
|
||||
|
||||
// Now use the forbidden value
|
||||
copy[valueIndex] = 0x00
|
||||
assertThatExceptionOfType(NotSerializableException::class.java).isThrownBy {
|
||||
des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext)
|
||||
des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext.withEncodingWhitelist(encodingWhitelist))
|
||||
}.withStackTraceContaining("Zero not allowed")
|
||||
}
|
||||
|
||||
@ -1208,7 +1208,7 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
val c = C(Amount(100, BigDecimal("1.5"), Currency.getInstance("USD")))
|
||||
|
||||
// were the issue not fixed we'd blow up here
|
||||
SerializationOutput(factory, compression).serialize(c)
|
||||
SerializationOutput(factory).serialize(c, compression)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1216,9 +1216,9 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
compression ?: return
|
||||
val factory = defaultFactory()
|
||||
val data = ByteArray(12345).also { Random(0).nextBytes(it) }.let { it + it }
|
||||
val compressed = SerializationOutput(factory, compression).serialize(data)
|
||||
val compressed = SerializationOutput(factory).serialize(data, compression)
|
||||
assertEquals(.5, compressed.size.toDouble() / data.size, .03)
|
||||
assertArrayEquals(data, DeserializationInput(factory, encodingWhitelist).deserialize(compressed))
|
||||
assertArrayEquals(data, DeserializationInput(factory).deserialize(compressed, testSerializationContext.withEncodingWhitelist(encodingWhitelist)))
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1226,9 +1226,9 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
compression ?: return
|
||||
val factory = defaultFactory()
|
||||
doReturn(false).whenever(encodingWhitelist).acceptEncoding(compression)
|
||||
val compressed = SerializationOutput(factory, compression).serialize("whatever")
|
||||
val input = DeserializationInput(factory, encodingWhitelist)
|
||||
catchThrowable { input.deserialize(compressed) }.run {
|
||||
val compressed = SerializationOutput(factory).serialize("whatever", compression)
|
||||
val input = DeserializationInput(factory)
|
||||
catchThrowable { input.deserialize(compressed, testSerializationContext.withEncodingWhitelist(encodingWhitelist)) }.run {
|
||||
assertSame(NotSerializableException::class.java, javaClass)
|
||||
assertEquals(encodingNotPermittedFormat.format(compression), message)
|
||||
}
|
||||
@ -1358,5 +1358,16 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
|
||||
throw Error("Deserializing serialized \$C should not throw")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `compression reduces number of bytes significantly`() {
|
||||
val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()))
|
||||
val obj = ByteArray(20000)
|
||||
val uncompressedSize = ser.serialize(obj).bytes.size
|
||||
val compressedSize = ser.serialize(obj, CordaSerializationEncoding.SNAPPY).bytes.size
|
||||
// Ordinarily this might be considered high maintenance, but we promised wire compatibility, so they'd better not change!
|
||||
assertEquals(20059, uncompressedSize)
|
||||
assertEquals(1018, compressedSize)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import net.corda.core.internal.copyTo
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.packageName
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationEncoding
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.serialization.internal.AllWhitelist
|
||||
@ -98,9 +99,9 @@ fun <T : Any> SerializationOutput.serializeAndReturnSchema(
|
||||
|
||||
|
||||
@Throws(NotSerializableException::class)
|
||||
fun <T : Any> SerializationOutput.serialize(obj: T): SerializedBytes<T> {
|
||||
fun <T : Any> SerializationOutput.serialize(obj: T, encoding: SerializationEncoding? = null): SerializedBytes<T> {
|
||||
try {
|
||||
return _serialize(obj, testSerializationContext)
|
||||
return _serialize(obj, testSerializationContext.withEncoding(encoding))
|
||||
} finally {
|
||||
andFinally()
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ class BlobInspector : Runnable {
|
||||
?: throw IllegalArgumentException("Error: this input does not appear to be encoded in Corda's AMQP extended format, sorry.")
|
||||
|
||||
if (schema) {
|
||||
val envelope = DeserializationInput.getEnvelope(bytes.sequence())
|
||||
val envelope = DeserializationInput.getEnvelope(bytes.sequence(), SerializationDefaults.STORAGE_CONTEXT.encodingWhitelist)
|
||||
out.println(envelope.schema)
|
||||
out.println()
|
||||
out.println(envelope.transformsSchema)
|
||||
|
@ -53,7 +53,7 @@ class Explorer internal constructor(private val explorerController: ExplorerCont
|
||||
val user = config.nodeConfig.rpcUsers[0]
|
||||
val p = explorerController.process(
|
||||
"--host=localhost",
|
||||
"--port=${config.nodeConfig.rpcAddress.port}",
|
||||
"--port=${config.nodeConfig.rpcSettings.address.port}",
|
||||
"--username=${user.username}",
|
||||
"--password=${user.password}")
|
||||
.directory(explorerDir.toFile())
|
||||
|
@ -32,7 +32,7 @@ class InstallFactory : Controller() {
|
||||
|
||||
val nodeConfig = config.parseAs<NodeConfig>(UnknownConfigKeysPolicy.IGNORE::handle)
|
||||
nodeConfig.p2pAddress.checkPort()
|
||||
nodeConfig.rpcAddress.checkPort()
|
||||
nodeConfig.rpcSettings.address.checkPort()
|
||||
nodeConfig.webAddress.checkPort()
|
||||
|
||||
val tempDir = Files.createTempDirectory(baseDir, ".node")
|
||||
|
@ -30,8 +30,7 @@ import java.util.Properties
|
||||
data class NodeConfig(
|
||||
val myLegalName: CordaX500Name,
|
||||
val p2pAddress: NetworkHostAndPort,
|
||||
val rpcAddress: NetworkHostAndPort,
|
||||
val rpcAdminAddress: NetworkHostAndPort,
|
||||
val rpcSettings: NodeRpcSettings,
|
||||
/** This is not used by the node but by the webserver which looks at node.conf. */
|
||||
val webAddress: NetworkHostAndPort,
|
||||
val notary: NotaryService?,
|
||||
@ -54,8 +53,8 @@ data class NodeConfig(
|
||||
|
||||
fun nodeConf(): Config {
|
||||
val rpcSettings: ConfigObject = empty()
|
||||
.withValue("address", valueFor(rpcAddress.toString()))
|
||||
.withValue("adminAddress", valueFor(rpcAdminAddress.toString()))
|
||||
.withValue("address", valueFor(rpcSettings.address.toString()))
|
||||
.withValue("adminAddress", valueFor(rpcSettings.adminAddress.toString()))
|
||||
.root()
|
||||
val customMap: Map<String, Any> = HashMap<String, Any>().also {
|
||||
if (issuableCurrencies.isNotEmpty()) {
|
||||
@ -63,7 +62,7 @@ data class NodeConfig(
|
||||
}
|
||||
}
|
||||
val custom: ConfigObject = ConfigFactory.parseMap(customMap).root()
|
||||
return NodeConfigurationData(myLegalName, p2pAddress, rpcAddress, notary, h2port, rpcUsers, useTestClock, detectPublicIp, devMode)
|
||||
return NodeConfigurationData(myLegalName, p2pAddress, this.rpcSettings.address, notary, h2port, rpcUsers, useTestClock, detectPublicIp, devMode)
|
||||
.toConfig()
|
||||
.withoutPath("rpcAddress")
|
||||
.withoutPath("rpcAdminAddress")
|
||||
@ -71,7 +70,7 @@ data class NodeConfig(
|
||||
.withOptionalValue("custom", custom)
|
||||
}
|
||||
|
||||
fun webServerConf() = WebServerConfigurationData(myLegalName, rpcAddress, webAddress, rpcUsers).asConfig()
|
||||
fun webServerConf() = WebServerConfigurationData(myLegalName, rpcSettings.address, webAddress, rpcUsers).asConfig()
|
||||
|
||||
fun toNodeConfText() = nodeConf().render()
|
||||
|
||||
|
@ -80,8 +80,10 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
|
||||
country = location.countryCode
|
||||
),
|
||||
p2pAddress = nodeData.p2pPort.toLocalAddress(),
|
||||
rpcAddress = nodeData.rpcPort.toLocalAddress(),
|
||||
rpcAdminAddress = nodeData.rpcAdminPort.toLocalAddress(),
|
||||
rpcSettings = NodeRpcSettings(
|
||||
address = nodeData.rpcPort.toLocalAddress(),
|
||||
adminAddress = nodeData.rpcAdminPort.toLocalAddress()
|
||||
),
|
||||
webAddress = nodeData.webPort.toLocalAddress(),
|
||||
notary = notary,
|
||||
h2port = nodeData.h2Port.value,
|
||||
@ -212,7 +214,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
|
||||
}
|
||||
|
||||
private fun updatePort(config: NodeConfig) {
|
||||
val nextPort = 1 + arrayOf(config.p2pAddress.port, config.rpcAddress.port, config.webAddress.port, config.h2port).max() as Int
|
||||
val nextPort = 1 + arrayOf(config.p2pAddress.port, config.rpcSettings.address.port, config.webAddress.port, config.h2port).max() as Int
|
||||
port.getAndUpdate { Math.max(nextPort, it) }
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,8 @@
|
||||
package net.corda.demobench.model
|
||||
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
|
||||
data class NodeRpcSettings(
|
||||
val address: NetworkHostAndPort,
|
||||
val adminAddress: NetworkHostAndPort
|
||||
)
|
@ -25,7 +25,7 @@ class NodeRPC(config: NodeConfigWrapper, start: (NodeConfigWrapper, CordaRPCOps)
|
||||
private val oneSecond = SECONDS.toMillis(1)
|
||||
}
|
||||
|
||||
private val rpcClient = CordaRPCClient(NetworkHostAndPort("localhost", config.nodeConfig.rpcAddress.port))
|
||||
private val rpcClient = CordaRPCClient(NetworkHostAndPort("localhost", config.nodeConfig.rpcSettings.address.port))
|
||||
@Volatile
|
||||
private var rpcConnection: CordaRPCConnection? = null
|
||||
private val timer = Timer("DemoBench NodeRPC (${config.key})", true)
|
||||
|
@ -21,7 +21,10 @@ import net.corda.webserver.WebServerConfig
|
||||
import org.junit.Test
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import kotlin.test.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class NodeConfigTest {
|
||||
companion object {
|
||||
@ -32,21 +35,21 @@ class NodeConfigTest {
|
||||
@Test
|
||||
fun `reading node configuration`() {
|
||||
val config = createConfig(
|
||||
legalName = myLegalName,
|
||||
p2pPort = 10001,
|
||||
rpcPort = 40002,
|
||||
rpcAdminPort = 40005,
|
||||
webPort = 20001,
|
||||
h2port = 30001,
|
||||
notary = NotaryService(validating = false),
|
||||
users = listOf(user("jenny"))
|
||||
legalName = myLegalName,
|
||||
p2pPort = 10001,
|
||||
rpcPort = 40002,
|
||||
rpcAdminPort = 40005,
|
||||
webPort = 20001,
|
||||
h2port = 30001,
|
||||
notary = NotaryService(validating = false),
|
||||
users = listOf(user("jenny"))
|
||||
)
|
||||
|
||||
val nodeConfig = config.nodeConf()
|
||||
.withValue("baseDirectory", valueFor(baseDir.toString()))
|
||||
.withFallback(ConfigFactory.parseResources("reference.conf"))
|
||||
.withFallback(ConfigFactory.parseMap(mapOf("devMode" to true)))
|
||||
.resolve()
|
||||
.withValue("baseDirectory", valueFor(baseDir.toString()))
|
||||
.withFallback(ConfigFactory.parseResources("reference.conf"))
|
||||
.withFallback(ConfigFactory.parseMap(mapOf("devMode" to true)))
|
||||
.resolve()
|
||||
val fullConfig = nodeConfig.parseAsNodeConfiguration()
|
||||
|
||||
// No custom configuration is created by default.
|
||||
@ -63,20 +66,20 @@ class NodeConfigTest {
|
||||
@Test
|
||||
fun `reading node configuration with currencies`() {
|
||||
val config = createConfig(
|
||||
legalName = myLegalName,
|
||||
p2pPort = 10001,
|
||||
rpcPort = 10002,
|
||||
rpcAdminPort = 10003,
|
||||
webPort = 10004,
|
||||
h2port = 10005,
|
||||
notary = NotaryService(validating = false),
|
||||
issuableCurrencies = listOf("GBP")
|
||||
legalName = myLegalName,
|
||||
p2pPort = 10001,
|
||||
rpcPort = 10002,
|
||||
rpcAdminPort = 10003,
|
||||
webPort = 10004,
|
||||
h2port = 10005,
|
||||
notary = NotaryService(validating = false),
|
||||
issuableCurrencies = listOf("GBP")
|
||||
)
|
||||
|
||||
val nodeConfig = config.nodeConf()
|
||||
.withValue("baseDirectory", valueFor(baseDir.toString()))
|
||||
.withFallback(ConfigFactory.parseResources("reference.conf"))
|
||||
.resolve()
|
||||
.withValue("baseDirectory", valueFor(baseDir.toString()))
|
||||
.withFallback(ConfigFactory.parseResources("reference.conf"))
|
||||
.resolve()
|
||||
val custom = nodeConfig.getConfig("custom")
|
||||
assertEquals(listOf("GBP"), custom.getAnyRefList("issuableCurrencies"))
|
||||
}
|
||||
@ -84,20 +87,20 @@ class NodeConfigTest {
|
||||
@Test
|
||||
fun `reading webserver configuration`() {
|
||||
val config = createConfig(
|
||||
legalName = myLegalName,
|
||||
p2pPort = 10001,
|
||||
rpcPort = 40002,
|
||||
rpcAdminPort = 40003,
|
||||
webPort = 20001,
|
||||
h2port = 30001,
|
||||
notary = NotaryService(validating = false),
|
||||
users = listOf(user("jenny"))
|
||||
legalName = myLegalName,
|
||||
p2pPort = 10001,
|
||||
rpcPort = 40002,
|
||||
rpcAdminPort = 40003,
|
||||
webPort = 20001,
|
||||
h2port = 30001,
|
||||
notary = NotaryService(validating = false),
|
||||
users = listOf(user("jenny"))
|
||||
)
|
||||
|
||||
val nodeConfig = config.webServerConf()
|
||||
.withValue("baseDirectory", valueFor(baseDir.toString()))
|
||||
.withFallback(ConfigFactory.parseResources("web-reference.conf"))
|
||||
.resolve()
|
||||
.withValue("baseDirectory", valueFor(baseDir.toString()))
|
||||
.withFallback(ConfigFactory.parseResources("web-reference.conf"))
|
||||
.resolve()
|
||||
val webConfig = WebServerConfig(baseDir, nodeConfig)
|
||||
|
||||
// No custom configuration is created by default.
|
||||
@ -110,26 +113,28 @@ class NodeConfigTest {
|
||||
}
|
||||
|
||||
private fun createConfig(
|
||||
legalName: CordaX500Name = CordaX500Name(organisation = "Unknown", locality = "Nowhere", country = "GB"),
|
||||
p2pPort: Int = -1,
|
||||
rpcPort: Int = -1,
|
||||
rpcAdminPort: Int = -1,
|
||||
webPort: Int = -1,
|
||||
h2port: Int = -1,
|
||||
notary: NotaryService?,
|
||||
users: List<User> = listOf(user("guest")),
|
||||
issuableCurrencies: List<String> = emptyList()
|
||||
legalName: CordaX500Name = CordaX500Name(organisation = "Unknown", locality = "Nowhere", country = "GB"),
|
||||
p2pPort: Int = -1,
|
||||
rpcPort: Int = -1,
|
||||
rpcAdminPort: Int = -1,
|
||||
webPort: Int = -1,
|
||||
h2port: Int = -1,
|
||||
notary: NotaryService?,
|
||||
users: List<User> = listOf(user("guest")),
|
||||
issuableCurrencies: List<String> = emptyList()
|
||||
): NodeConfig {
|
||||
return NodeConfig(
|
||||
myLegalName = legalName,
|
||||
p2pAddress = localPort(p2pPort),
|
||||
rpcAddress = localPort(rpcPort),
|
||||
rpcAdminAddress = localPort(rpcAdminPort),
|
||||
webAddress = localPort(webPort),
|
||||
h2port = h2port,
|
||||
notary = notary,
|
||||
rpcUsers = users,
|
||||
issuableCurrencies = issuableCurrencies
|
||||
myLegalName = legalName,
|
||||
p2pAddress = localPort(p2pPort),
|
||||
rpcSettings = NodeRpcSettings(
|
||||
address = localPort(rpcPort),
|
||||
adminAddress = localPort(rpcAdminPort)
|
||||
),
|
||||
webAddress = localPort(webPort),
|
||||
h2port = h2port,
|
||||
notary = notary,
|
||||
rpcUsers = users,
|
||||
issuableCurrencies = issuableCurrencies
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -173,8 +173,10 @@ class NodeControllerTest {
|
||||
country = "US"
|
||||
),
|
||||
p2pAddress = localPort(p2pPort),
|
||||
rpcAddress = localPort(rpcPort),
|
||||
rpcAdminAddress = localPort(rpcAdminPort),
|
||||
rpcSettings = NodeRpcSettings(
|
||||
address = localPort(rpcPort),
|
||||
adminAddress = localPort(rpcAdminPort)
|
||||
),
|
||||
webAddress = localPort(webPort),
|
||||
h2port = h2port,
|
||||
notary = notary,
|
||||
|
Loading…
x
Reference in New Issue
Block a user