diff --git a/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt b/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt index 048c0cae68..f62c67b2c3 100644 --- a/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt +++ b/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt @@ -5,14 +5,15 @@ import java.security.Provider import java.security.SecureRandom import java.security.SecureRandomSpi import java.security.Security +import java.util.concurrent.atomic.AtomicInteger /** * Temporarily restore Sun's [SecureRandom] provider. * This is ONLY for allowing us to generate test data, e.g. signatures. */ -class CheatingSecurityProvider : Provider(NAME, 1.8, "$NAME security provider"), AutoCloseable { +class CheatingSecurityProvider : Provider("Cheat-${counter.getAndIncrement()}", 1.8, "Cheat security provider"), AutoCloseable { private companion object { - private const val NAME = "Cheat!" + private val counter = AtomicInteger() } init { @@ -21,7 +22,7 @@ class CheatingSecurityProvider : Provider(NAME, 1.8, "$NAME security provider"), } override fun close() { - Security.removeProvider(NAME) + Security.removeProvider(name) } private class SunSecureRandom : SecureRandom(sun.security.provider.SecureRandom(), null) diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt index b31096674f..c750a6451b 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt @@ -208,6 +208,11 @@ interface SerializationContext { */ fun withEncoding(encoding: SerializationEncoding?): SerializationContext + /** + * A shallow copy of this context but with the given encoding whitelist. + */ + fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist): SerializationContext + /** * The use case that we are serializing for, since it influences the implementations chosen. */ diff --git a/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt b/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt index d3c1fa0444..161f7d4195 100644 --- a/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt +++ b/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt @@ -11,23 +11,36 @@ package net.corda.core.cordapp import co.paralleluniverse.fibers.Suspendable +import net.corda.core.crypto.Crypto.generateKeyPair +import net.corda.core.crypto.sign import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party -import net.corda.core.internal.copyToDirectory -import net.corda.core.internal.createDirectories -import net.corda.core.internal.div -import net.corda.core.internal.list +import net.corda.core.identity.PartyAndCertificate +import net.corda.core.internal.* import net.corda.core.messaging.startFlow +import net.corda.core.node.NodeInfo +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap +import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA +import net.corda.nodeapi.internal.DEV_ROOT_CA +import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.config.User +import net.corda.nodeapi.internal.createDevNodeCa +import net.corda.nodeapi.internal.crypto.CertificateType +import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.smoketesting.NodeConfig import net.corda.smoketesting.NodeProcess import net.corda.smoketesting.NodeProcess.Companion.CORDAPPS_DIR_NAME import org.assertj.core.api.Assertions.assertThat import org.junit.Test +import java.nio.file.Path import java.nio.file.Paths +import java.security.KeyPair +import java.security.PrivateKey +import java.security.PublicKey import java.util.concurrent.atomic.AtomicInteger import kotlin.streams.toList @@ -50,13 +63,21 @@ class CordappSmokeTest { @Test fun `FlowContent appName returns the filename of the CorDapp jar`() { - val cordappsDir = (factory.baseDirectory(aliceConfig) / CORDAPPS_DIR_NAME).createDirectories() + val baseDir = factory.baseDirectory(aliceConfig) + val cordappsDir = (baseDir / CORDAPPS_DIR_NAME).createDirectories() // Find the jar file for the smoke tests of this module val selfCordapp = Paths.get("build", "libs").list { it.filter { "-smokeTests" in it.toString() }.toList().single() } selfCordapp.copyToDirectory(cordappsDir) + // The `nodeReadyFuture` in the persistent network map cache will not complete unless there is at least one other + // node in the network. We work around this limitation by putting another node info file in the additional-node-info + // folder. + // TODO clean this up after we refactor the persistent network map cache / network map updater + val additionalNodeInfoDir = (baseDir / "additional-node-infos").createDirectories() + createDummyNodeInfo(additionalNodeInfoDir) + factory.create(aliceConfig).use { alice -> alice.connect().use { connectionToAlice -> val aliceIdentity = connectionToAlice.proxy.nodeInfo().legalIdentitiesAndCerts.first().party @@ -100,4 +121,39 @@ class CordappSmokeTest { otherPartySession.send(sessionInitContext) } } + + private fun createDummyNodeInfo(additionalNodeInfoDir: Path) { + val dummyKeyPair = generateKeyPair() + val nodeInfo = createNodeInfoWithSingleIdentity(CordaX500Name(organisation = "Bob Corp", locality = "Madrid", country = "ES"), dummyKeyPair, dummyKeyPair.public) + val signedNodeInfo = signWith(nodeInfo, listOf(dummyKeyPair.private)) + (additionalNodeInfoDir / "nodeInfo-41408E093F95EAD51F6892C34DEB65AE1A3569A4B0E5744769A1B485AF8E04B5").write(signedNodeInfo.serialize().bytes) + } + + private fun createNodeInfoWithSingleIdentity(name: CordaX500Name, nodeKeyPair: KeyPair, identityCertPublicKey: PublicKey): NodeInfo { + val nodeCertificateAndKeyPair = createDevNodeCa(DEV_INTERMEDIATE_CA, name, nodeKeyPair) + val identityCert = X509Utilities.createCertificate( + CertificateType.LEGAL_IDENTITY, + nodeCertificateAndKeyPair.certificate, + nodeCertificateAndKeyPair.keyPair, + nodeCertificateAndKeyPair.certificate.subjectX500Principal, + identityCertPublicKey) + val certPath = X509Utilities.buildCertPath( + identityCert, + nodeCertificateAndKeyPair.certificate, + DEV_INTERMEDIATE_CA.certificate, + DEV_ROOT_CA.certificate) + val partyAndCertificate = PartyAndCertificate(certPath) + return NodeInfo( + listOf(NetworkHostAndPort("my.${partyAndCertificate.party.name.organisation}.com", 1234)), + listOf(partyAndCertificate), + 1, + 1 + ) + } + + private fun signWith(nodeInfo: NodeInfo, keys: List): SignedNodeInfo { + val serialized = nodeInfo.serialize() + val signatures = keys.map { it.sign(serialized.bytes) } + return SignedNodeInfo(serialized, signatures) + } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt deleted file mode 100644 index 05e45a0bff..0000000000 --- a/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt +++ /dev/null @@ -1,69 +0,0 @@ -package net.corda.node.services - -import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint -import net.corda.core.flows.NotaryFlow -import net.corda.core.identity.Party -import net.corda.core.transactions.SignedTransaction -import net.corda.core.transactions.TransactionBuilder -import net.corda.core.utilities.getOrThrow -import net.corda.node.services.BFTNotaryServiceTests.Companion.startBftClusterAndNode -import net.corda.node.services.transactions.minClusterSize -import net.corda.testing.contracts.DummyContract -import net.corda.testing.core.dummyCommand -import net.corda.testing.core.singleIdentity -import net.corda.testing.internal.IntegrationTest -import net.corda.testing.internal.IntegrationTestSchemas -import net.corda.testing.node.internal.cordappsForPackages -import net.corda.testing.node.internal.InternalMockNetwork -import net.corda.testing.node.internal.TestStartedNode -import net.corda.testing.node.internal.startFlow -import org.junit.After -import org.junit.Before -import org.junit.ClassRule -import org.junit.Test - -class BFTSMaRtTests : IntegrationTest() { - companion object { - @ClassRule - @JvmField - val databaseSchemas = IntegrationTestSchemas("node_0", "node_1", "node_2", "node_3", "node_4", "node_5", - "node_6", "node_7", "node_8", "node_9") - } - - private lateinit var mockNet: InternalMockNetwork - - @Before - fun before() { - mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts")) - } - - @After - fun stopNodes() { - mockNet.stopNodes() - } - - /** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */ - @Test - fun `all replicas start even if there is a new consensus during startup`() { - val clusterSize = minClusterSize(1) - val (notary, node) = startBftClusterAndNode(clusterSize, mockNet, exposeRaces = true) // This true adds a sleep to expose the race. - val f = node.run { - val trivialTx = signInitialTransaction(notary) { - addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) - } - // Create a new consensus while the redundant replica is sleeping: - services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture - } - mockNet.runNetwork() - f.getOrThrow() - } - - private fun TestStartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction { - return services.signInitialTransaction( - TransactionBuilder(notary).apply { - addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey)) - block() - } - ) - } -} diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 157ef99bae..2b87983d05 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -146,7 +146,7 @@ open class NodeStartup(val args: Array) { private fun Exception.logAsExpected(message: String? = this.message, print: (String?) -> Unit = logger::error) = print("$message [errorCode=${errorCode()}]") - private fun Exception.logAsUnexpected(message: String? = this.message, error: Exception = this, print: (String?, Throwable) -> Unit = logger::error) = print("$message [errorCode=${errorCode()}]", error) + private fun Exception.logAsUnexpected(message: String? = this.message, error: Exception = this, print: (String?, Throwable) -> Unit = logger::error) = print("$message${this.message?.let { ": $it" } ?: ""} [errorCode=${errorCode()}]", error) private fun Exception.isOpenJdkKnownIssue() = message?.startsWith("Unknown named curve:") == true diff --git a/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt b/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt index 83f040f025..95cd54f2da 100644 --- a/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt +++ b/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt @@ -54,7 +54,6 @@ class TestScheme : AbstractKryoSerializationScheme() { override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - } @RunWith(Parameterized::class) @@ -99,7 +98,6 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null)) } - @Test fun `serialised form is stable when the same object instance is added to the deserialised object graph`() { val noReferencesContext = context.withoutReferences() @@ -366,4 +364,16 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { assertEquals(encodingNotPermittedFormat.format(compression), message) } } + + @Test + fun `compression reduces number of bytes significantly`() { + class Holder(val holder: ByteArray) + + val obj = Holder(ByteArray(20000)) + val uncompressedSize = obj.serialize(factory, context.withEncoding(null)).size + val compressedSize = obj.serialize(factory, context.withEncoding(CordaSerializationEncoding.SNAPPY)).size + // If these need fixing, sounds like Kryo wire format changed and checkpoints might not surive an upgrade. + assertEquals(20222, uncompressedSize) + assertEquals(1111, compressedSize) + } } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt index cb70b8053a..a90cf1b818 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt @@ -17,6 +17,7 @@ import net.corda.core.utilities.OpaqueBytes import net.corda.serialization.internal.OrdinalBits.OrdinalWriter import org.iq80.snappy.SnappyFramedInputStream import org.iq80.snappy.SnappyFramedOutputStream +import java.io.IOException import java.io.InputStream import java.io.OutputStream import java.nio.ByteBuffer @@ -54,7 +55,7 @@ enum class CordaSerializationEncoding : SerializationEncoding, OrdinalWriter { override fun wrap(stream: InputStream) = InflaterInputStream(stream) }, SNAPPY { - override fun wrap(stream: OutputStream) = SnappyFramedOutputStream(stream) + override fun wrap(stream: OutputStream) = FlushAverseOutputStream(SnappyFramedOutputStream(stream)) override fun wrap(stream: InputStream) = SnappyFramedInputStream(stream, false) }; @@ -68,3 +69,21 @@ enum class CordaSerializationEncoding : SerializationEncoding, OrdinalWriter { } const val encodingNotPermittedFormat = "Encoding not permitted: %s" + +/** + * Has an empty flush implementation. This is because Kryo keeps calling flush all the time, which stops the Snappy + * stream from building up big chunks to compress and instead keeps compressing small chunks giving terrible compression ratio. + */ +class FlushAverseOutputStream(private val delegate: OutputStream) : OutputStream() { + @Throws(IOException::class) + override fun write(b: Int) = delegate.write(b) + + @Throws(IOException::class) + override fun write(b: ByteArray?, off: Int, len: Int) = delegate.write(b, off, len) + + @Throws(IOException::class) + override fun close() { + delegate.flush() + delegate.close() + } +} diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt index 0176dd63ca..1b3873ec92 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt @@ -77,6 +77,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe override fun withPreferredSerializationVersion(magic: SerializationMagic) = copy(preferredSerializationVersion = magic) override fun withEncoding(encoding: SerializationEncoding?) = copy(encoding = encoding) + override fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist) = copy(encodingWhitelist = encodingWhitelist) } /* diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt index 2c1cfbadae..a48f92e331 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt @@ -39,9 +39,8 @@ data class ObjectAndEnvelope(val obj: T, val envelope: Envelope) * instances and threads. */ @KeepForDJVM -class DeserializationInput @JvmOverloads constructor( - private val serializerFactory: SerializerFactory, - private val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist +class DeserializationInput constructor( + private val serializerFactory: SerializerFactory ) { private val objectHistory: MutableList = mutableListOf() private val logger = loggerFor() @@ -90,9 +89,9 @@ class DeserializationInput @JvmOverloads constructor( } } - + @VisibleForTesting @Throws(AMQPNoTypeNotSerializableException::class) - fun getEnvelope(byteSequence: ByteSequence) = getEnvelope(byteSequence, encodingWhitelist) + fun getEnvelope(byteSequence: ByteSequence, context: SerializationContext) = getEnvelope(byteSequence, context.encodingWhitelist) @Throws( AMQPNotSerializableException::class, @@ -126,7 +125,7 @@ class DeserializationInput @JvmOverloads constructor( @Throws(NotSerializableException::class) fun deserialize(bytes: ByteSequence, clazz: Class, context: SerializationContext): T = des { - val envelope = getEnvelope(bytes, encodingWhitelist) + val envelope = getEnvelope(bytes, context.encodingWhitelist) logger.trace("deserialize blob scheme=\"${envelope.schema.toString()}\"") @@ -140,7 +139,7 @@ class DeserializationInput @JvmOverloads constructor( clazz: Class, context: SerializationContext ): ObjectAndEnvelope = des { - val envelope = getEnvelope(bytes, encodingWhitelist) + val envelope = getEnvelope(bytes, context.encodingWhitelist) // Now pick out the obj and schema from the envelope. ObjectAndEnvelope( clazz.cast(readObjectOrNull( diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt index 65a15562c8..a9d03a10a1 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt @@ -12,7 +12,6 @@ package net.corda.serialization.internal.amqp import net.corda.core.KeepForDJVM import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationEncoding import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.contextLogger import net.corda.serialization.internal.CordaSerializationEncoding @@ -38,9 +37,8 @@ data class BytesAndSchemas( * instances and threads. */ @KeepForDJVM -open class SerializationOutput @JvmOverloads constructor( - internal val serializerFactory: SerializerFactory, - private val encoding: SerializationEncoding? = null +open class SerializationOutput constructor( + internal val serializerFactory: SerializerFactory ) { companion object { private val logger = contextLogger() @@ -100,6 +98,7 @@ open class SerializationOutput @JvmOverloads constructor( var stream: OutputStream = it try { amqpMagic.writeTo(stream) + val encoding = context.encoding if (encoding != null) { SectionId.ENCODING.writeTo(stream) (encoding as CordaSerializationEncoding).writeTo(stream) diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt index a57604f88b..fd29f0d3af 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt @@ -18,9 +18,9 @@ import net.corda.node.services.statemachine.DataSessionMessage import net.corda.serialization.internal.amqp.DeserializationInput import net.corda.serialization.internal.amqp.Envelope import net.corda.serialization.internal.amqp.SerializerFactory +import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.internal.amqpSpecific import net.corda.testing.internal.kryoSpecific -import net.corda.testing.core.SerializationEnvironmentRule import org.assertj.core.api.Assertions import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertEquals @@ -38,7 +38,7 @@ class ListsSerializationTest { fun verifyEnvelope(serBytes: SerializedBytes, envVerBody: (Envelope) -> Unit) = amqpSpecific("AMQP specific envelope verification") { val context = SerializationFactory.defaultFactory.defaultContext - val envelope = DeserializationInput(SerializerFactory(context.whitelist, context.deserializationClassLoader)).getEnvelope(serBytes) + val envelope = DeserializationInput(SerializerFactory(context.whitelist, context.deserializationClassLoader)).getEnvelope(serBytes, context) envVerBody(envelope) } } diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt index 841623eff4..243915b1f8 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt @@ -229,8 +229,8 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi freshDeserializationFactory: SerializerFactory = defaultFactory(), expectedEqual: Boolean = true, expectDeserializedEqual: Boolean = true): T { - val ser = SerializationOutput(factory, compression) - val bytes = ser.serialize(obj) + val ser = SerializationOutput(factory) + val bytes = ser.serialize(obj, compression) val decoder = DecoderImpl().apply { this.register(Envelope.DESCRIPTOR, Envelope.Companion) @@ -251,14 +251,14 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi val result = decoder.readObject() as Envelope assertNotNull(result) } - val des = DeserializationInput(freshDeserializationFactory, encodingWhitelist) - val desObj = des.deserialize(bytes) + val des = DeserializationInput(freshDeserializationFactory) + val desObj = des.deserialize(bytes, testSerializationContext.withEncodingWhitelist(encodingWhitelist)) assertTrue(Objects.deepEquals(obj, desObj) == expectedEqual) // Now repeat with a re-used factory - val ser2 = SerializationOutput(factory, compression) - val des2 = DeserializationInput(factory, encodingWhitelist) - val desObj2 = des2.deserialize(ser2.serialize(obj)) + val ser2 = SerializationOutput(factory) + val des2 = DeserializationInput(factory) + val desObj2 = des2.deserialize(ser2.serialize(obj, compression), testSerializationContext.withEncodingWhitelist(encodingWhitelist)) assertTrue(Objects.deepEquals(obj, desObj2) == expectedEqual) assertTrue(Objects.deepEquals(desObj, desObj2) == expectDeserializedEqual) @@ -481,10 +481,10 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi @Test fun `class constructor is invoked on deserialisation`() { compression == null || return // Manipulation of serialized bytes is invalid if they're compressed. - val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()), compression) - val des = DeserializationInput(ser.serializerFactory, encodingWhitelist) - val serialisedOne = ser.serialize(NonZeroByte(1)).bytes - val serialisedTwo = ser.serialize(NonZeroByte(2)).bytes + val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())) + val des = DeserializationInput(ser.serializerFactory) + val serialisedOne = ser.serialize(NonZeroByte(1), compression).bytes + val serialisedTwo = ser.serialize(NonZeroByte(2), compression).bytes // Find the index that holds the value byte val valueIndex = serialisedOne.zip(serialisedTwo).mapIndexedNotNull { index, (oneByte, twoByte) -> @@ -495,12 +495,12 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi // Double check copy[valueIndex] = 0x03 - assertThat(des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext).value).isEqualTo(3) + assertThat(des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext.withEncodingWhitelist(encodingWhitelist)).value).isEqualTo(3) // Now use the forbidden value copy[valueIndex] = 0x00 assertThatExceptionOfType(NotSerializableException::class.java).isThrownBy { - des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext) + des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext.withEncodingWhitelist(encodingWhitelist)) }.withStackTraceContaining("Zero not allowed") } @@ -1208,7 +1208,7 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi val c = C(Amount(100, BigDecimal("1.5"), Currency.getInstance("USD"))) // were the issue not fixed we'd blow up here - SerializationOutput(factory, compression).serialize(c) + SerializationOutput(factory).serialize(c, compression) } @Test @@ -1216,9 +1216,9 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi compression ?: return val factory = defaultFactory() val data = ByteArray(12345).also { Random(0).nextBytes(it) }.let { it + it } - val compressed = SerializationOutput(factory, compression).serialize(data) + val compressed = SerializationOutput(factory).serialize(data, compression) assertEquals(.5, compressed.size.toDouble() / data.size, .03) - assertArrayEquals(data, DeserializationInput(factory, encodingWhitelist).deserialize(compressed)) + assertArrayEquals(data, DeserializationInput(factory).deserialize(compressed, testSerializationContext.withEncodingWhitelist(encodingWhitelist))) } @Test @@ -1226,9 +1226,9 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi compression ?: return val factory = defaultFactory() doReturn(false).whenever(encodingWhitelist).acceptEncoding(compression) - val compressed = SerializationOutput(factory, compression).serialize("whatever") - val input = DeserializationInput(factory, encodingWhitelist) - catchThrowable { input.deserialize(compressed) }.run { + val compressed = SerializationOutput(factory).serialize("whatever", compression) + val input = DeserializationInput(factory) + catchThrowable { input.deserialize(compressed, testSerializationContext.withEncodingWhitelist(encodingWhitelist)) }.run { assertSame(NotSerializableException::class.java, javaClass) assertEquals(encodingNotPermittedFormat.format(compression), message) } @@ -1358,5 +1358,16 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi throw Error("Deserializing serialized \$C should not throw") } } + + @Test + fun `compression reduces number of bytes significantly`() { + val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())) + val obj = ByteArray(20000) + val uncompressedSize = ser.serialize(obj).bytes.size + val compressedSize = ser.serialize(obj, CordaSerializationEncoding.SNAPPY).bytes.size + // Ordinarily this might be considered high maintenance, but we promised wire compatibility, so they'd better not change! + assertEquals(20059, uncompressedSize) + assertEquals(1018, compressedSize) + } } diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt index 9f300fd9e8..e3820e2d6b 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt @@ -4,6 +4,7 @@ import net.corda.core.internal.copyTo import net.corda.core.internal.div import net.corda.core.internal.packageName import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationEncoding import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.OpaqueBytes import net.corda.serialization.internal.AllWhitelist @@ -98,9 +99,9 @@ fun SerializationOutput.serializeAndReturnSchema( @Throws(NotSerializableException::class) -fun SerializationOutput.serialize(obj: T): SerializedBytes { +fun SerializationOutput.serialize(obj: T, encoding: SerializationEncoding? = null): SerializedBytes { try { - return _serialize(obj, testSerializationContext) + return _serialize(obj, testSerializationContext.withEncoding(encoding)) } finally { andFinally() } diff --git a/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt b/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt index 94e1a45747..655ddb830c 100644 --- a/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt +++ b/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt @@ -83,7 +83,7 @@ class BlobInspector : Runnable { ?: throw IllegalArgumentException("Error: this input does not appear to be encoded in Corda's AMQP extended format, sorry.") if (schema) { - val envelope = DeserializationInput.getEnvelope(bytes.sequence()) + val envelope = DeserializationInput.getEnvelope(bytes.sequence(), SerializationDefaults.STORAGE_CONTEXT.encodingWhitelist) out.println(envelope.schema) out.println() out.println(envelope.transformsSchema) diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt index 5bd2bd9f84..12c95ad963 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt @@ -53,7 +53,7 @@ class Explorer internal constructor(private val explorerController: ExplorerCont val user = config.nodeConfig.rpcUsers[0] val p = explorerController.process( "--host=localhost", - "--port=${config.nodeConfig.rpcAddress.port}", + "--port=${config.nodeConfig.rpcSettings.address.port}", "--username=${user.username}", "--password=${user.password}") .directory(explorerDir.toFile()) diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt index 3ea7c25306..1497816aee 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt @@ -32,7 +32,7 @@ class InstallFactory : Controller() { val nodeConfig = config.parseAs(UnknownConfigKeysPolicy.IGNORE::handle) nodeConfig.p2pAddress.checkPort() - nodeConfig.rpcAddress.checkPort() + nodeConfig.rpcSettings.address.checkPort() nodeConfig.webAddress.checkPort() val tempDir = Files.createTempDirectory(baseDir, ".node") diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt index 0513b3a071..96259d1907 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt @@ -30,8 +30,7 @@ import java.util.Properties data class NodeConfig( val myLegalName: CordaX500Name, val p2pAddress: NetworkHostAndPort, - val rpcAddress: NetworkHostAndPort, - val rpcAdminAddress: NetworkHostAndPort, + val rpcSettings: NodeRpcSettings, /** This is not used by the node but by the webserver which looks at node.conf. */ val webAddress: NetworkHostAndPort, val notary: NotaryService?, @@ -54,8 +53,8 @@ data class NodeConfig( fun nodeConf(): Config { val rpcSettings: ConfigObject = empty() - .withValue("address", valueFor(rpcAddress.toString())) - .withValue("adminAddress", valueFor(rpcAdminAddress.toString())) + .withValue("address", valueFor(rpcSettings.address.toString())) + .withValue("adminAddress", valueFor(rpcSettings.adminAddress.toString())) .root() val customMap: Map = HashMap().also { if (issuableCurrencies.isNotEmpty()) { @@ -63,7 +62,7 @@ data class NodeConfig( } } val custom: ConfigObject = ConfigFactory.parseMap(customMap).root() - return NodeConfigurationData(myLegalName, p2pAddress, rpcAddress, notary, h2port, rpcUsers, useTestClock, detectPublicIp, devMode) + return NodeConfigurationData(myLegalName, p2pAddress, this.rpcSettings.address, notary, h2port, rpcUsers, useTestClock, detectPublicIp, devMode) .toConfig() .withoutPath("rpcAddress") .withoutPath("rpcAdminAddress") @@ -71,7 +70,7 @@ data class NodeConfig( .withOptionalValue("custom", custom) } - fun webServerConf() = WebServerConfigurationData(myLegalName, rpcAddress, webAddress, rpcUsers).asConfig() + fun webServerConf() = WebServerConfigurationData(myLegalName, rpcSettings.address, webAddress, rpcUsers).asConfig() fun toNodeConfText() = nodeConf().render() diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt index 6037d425d2..127bdc8304 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt @@ -80,8 +80,10 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { country = location.countryCode ), p2pAddress = nodeData.p2pPort.toLocalAddress(), - rpcAddress = nodeData.rpcPort.toLocalAddress(), - rpcAdminAddress = nodeData.rpcAdminPort.toLocalAddress(), + rpcSettings = NodeRpcSettings( + address = nodeData.rpcPort.toLocalAddress(), + adminAddress = nodeData.rpcAdminPort.toLocalAddress() + ), webAddress = nodeData.webPort.toLocalAddress(), notary = notary, h2port = nodeData.h2Port.value, @@ -212,7 +214,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { } private fun updatePort(config: NodeConfig) { - val nextPort = 1 + arrayOf(config.p2pAddress.port, config.rpcAddress.port, config.webAddress.port, config.h2port).max() as Int + val nextPort = 1 + arrayOf(config.p2pAddress.port, config.rpcSettings.address.port, config.webAddress.port, config.h2port).max() as Int port.getAndUpdate { Math.max(nextPort, it) } } diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeRpcSettings.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeRpcSettings.kt new file mode 100644 index 0000000000..04d093a318 --- /dev/null +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeRpcSettings.kt @@ -0,0 +1,8 @@ +package net.corda.demobench.model + +import net.corda.core.utilities.NetworkHostAndPort + +data class NodeRpcSettings( + val address: NetworkHostAndPort, + val adminAddress: NetworkHostAndPort +) \ No newline at end of file diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt index 89132893e2..d0b5df0305 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt @@ -25,7 +25,7 @@ class NodeRPC(config: NodeConfigWrapper, start: (NodeConfigWrapper, CordaRPCOps) private val oneSecond = SECONDS.toMillis(1) } - private val rpcClient = CordaRPCClient(NetworkHostAndPort("localhost", config.nodeConfig.rpcAddress.port)) + private val rpcClient = CordaRPCClient(NetworkHostAndPort("localhost", config.nodeConfig.rpcSettings.address.port)) @Volatile private var rpcConnection: CordaRPCConnection? = null private val timer = Timer("DemoBench NodeRPC (${config.key})", true) diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt index ee066821cc..e84be63388 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt @@ -21,7 +21,10 @@ import net.corda.webserver.WebServerConfig import org.junit.Test import java.nio.file.Path import java.nio.file.Paths -import kotlin.test.* +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue class NodeConfigTest { companion object { @@ -32,21 +35,21 @@ class NodeConfigTest { @Test fun `reading node configuration`() { val config = createConfig( - legalName = myLegalName, - p2pPort = 10001, - rpcPort = 40002, - rpcAdminPort = 40005, - webPort = 20001, - h2port = 30001, - notary = NotaryService(validating = false), - users = listOf(user("jenny")) + legalName = myLegalName, + p2pPort = 10001, + rpcPort = 40002, + rpcAdminPort = 40005, + webPort = 20001, + h2port = 30001, + notary = NotaryService(validating = false), + users = listOf(user("jenny")) ) val nodeConfig = config.nodeConf() - .withValue("baseDirectory", valueFor(baseDir.toString())) - .withFallback(ConfigFactory.parseResources("reference.conf")) - .withFallback(ConfigFactory.parseMap(mapOf("devMode" to true))) - .resolve() + .withValue("baseDirectory", valueFor(baseDir.toString())) + .withFallback(ConfigFactory.parseResources("reference.conf")) + .withFallback(ConfigFactory.parseMap(mapOf("devMode" to true))) + .resolve() val fullConfig = nodeConfig.parseAsNodeConfiguration() // No custom configuration is created by default. @@ -63,20 +66,20 @@ class NodeConfigTest { @Test fun `reading node configuration with currencies`() { val config = createConfig( - legalName = myLegalName, - p2pPort = 10001, - rpcPort = 10002, - rpcAdminPort = 10003, - webPort = 10004, - h2port = 10005, - notary = NotaryService(validating = false), - issuableCurrencies = listOf("GBP") + legalName = myLegalName, + p2pPort = 10001, + rpcPort = 10002, + rpcAdminPort = 10003, + webPort = 10004, + h2port = 10005, + notary = NotaryService(validating = false), + issuableCurrencies = listOf("GBP") ) val nodeConfig = config.nodeConf() - .withValue("baseDirectory", valueFor(baseDir.toString())) - .withFallback(ConfigFactory.parseResources("reference.conf")) - .resolve() + .withValue("baseDirectory", valueFor(baseDir.toString())) + .withFallback(ConfigFactory.parseResources("reference.conf")) + .resolve() val custom = nodeConfig.getConfig("custom") assertEquals(listOf("GBP"), custom.getAnyRefList("issuableCurrencies")) } @@ -84,20 +87,20 @@ class NodeConfigTest { @Test fun `reading webserver configuration`() { val config = createConfig( - legalName = myLegalName, - p2pPort = 10001, - rpcPort = 40002, - rpcAdminPort = 40003, - webPort = 20001, - h2port = 30001, - notary = NotaryService(validating = false), - users = listOf(user("jenny")) + legalName = myLegalName, + p2pPort = 10001, + rpcPort = 40002, + rpcAdminPort = 40003, + webPort = 20001, + h2port = 30001, + notary = NotaryService(validating = false), + users = listOf(user("jenny")) ) val nodeConfig = config.webServerConf() - .withValue("baseDirectory", valueFor(baseDir.toString())) - .withFallback(ConfigFactory.parseResources("web-reference.conf")) - .resolve() + .withValue("baseDirectory", valueFor(baseDir.toString())) + .withFallback(ConfigFactory.parseResources("web-reference.conf")) + .resolve() val webConfig = WebServerConfig(baseDir, nodeConfig) // No custom configuration is created by default. @@ -110,26 +113,28 @@ class NodeConfigTest { } private fun createConfig( - legalName: CordaX500Name = CordaX500Name(organisation = "Unknown", locality = "Nowhere", country = "GB"), - p2pPort: Int = -1, - rpcPort: Int = -1, - rpcAdminPort: Int = -1, - webPort: Int = -1, - h2port: Int = -1, - notary: NotaryService?, - users: List = listOf(user("guest")), - issuableCurrencies: List = emptyList() + legalName: CordaX500Name = CordaX500Name(organisation = "Unknown", locality = "Nowhere", country = "GB"), + p2pPort: Int = -1, + rpcPort: Int = -1, + rpcAdminPort: Int = -1, + webPort: Int = -1, + h2port: Int = -1, + notary: NotaryService?, + users: List = listOf(user("guest")), + issuableCurrencies: List = emptyList() ): NodeConfig { return NodeConfig( - myLegalName = legalName, - p2pAddress = localPort(p2pPort), - rpcAddress = localPort(rpcPort), - rpcAdminAddress = localPort(rpcAdminPort), - webAddress = localPort(webPort), - h2port = h2port, - notary = notary, - rpcUsers = users, - issuableCurrencies = issuableCurrencies + myLegalName = legalName, + p2pAddress = localPort(p2pPort), + rpcSettings = NodeRpcSettings( + address = localPort(rpcPort), + adminAddress = localPort(rpcAdminPort) + ), + webAddress = localPort(webPort), + h2port = h2port, + notary = notary, + rpcUsers = users, + issuableCurrencies = issuableCurrencies ) } diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt index 2a796b4e1c..17da7915ae 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt @@ -173,8 +173,10 @@ class NodeControllerTest { country = "US" ), p2pAddress = localPort(p2pPort), - rpcAddress = localPort(rpcPort), - rpcAdminAddress = localPort(rpcAdminPort), + rpcSettings = NodeRpcSettings( + address = localPort(rpcPort), + adminAddress = localPort(rpcAdminPort) + ), webAddress = localPort(webPort), h2port = h2port, notary = notary,