From 41fc8d6c55aa30dcff3ec7e63f97b8bb1f92c12e Mon Sep 17 00:00:00 2001 From: Florian Friemel Date: Tue, 21 Aug 2018 13:02:18 +0100 Subject: [PATCH 1/6] Remove BFTSMaRtTests. (#3822) --- .../net/corda/node/services/BFTSMaRtTests.kt | 59 ------------------- 1 file changed, 59 deletions(-) delete mode 100644 node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt deleted file mode 100644 index c33ec2b2ac..0000000000 --- a/node/src/integration-test/kotlin/net/corda/node/services/BFTSMaRtTests.kt +++ /dev/null @@ -1,59 +0,0 @@ -package net.corda.node.services - -import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint -import net.corda.core.flows.NotaryFlow -import net.corda.core.identity.Party -import net.corda.core.transactions.SignedTransaction -import net.corda.core.transactions.TransactionBuilder -import net.corda.core.utilities.getOrThrow -import net.corda.node.services.BFTNotaryServiceTests.Companion.startBftClusterAndNode -import net.corda.node.services.transactions.minClusterSize -import net.corda.testing.contracts.DummyContract -import net.corda.testing.core.dummyCommand -import net.corda.testing.core.singleIdentity -import net.corda.testing.node.internal.cordappsForPackages -import net.corda.testing.node.internal.InternalMockNetwork -import net.corda.testing.node.internal.TestStartedNode -import net.corda.testing.node.internal.startFlow -import org.junit.After -import org.junit.Before -import org.junit.Test - -class BFTSMaRtTests { - private lateinit var mockNet: InternalMockNetwork - - @Before - fun before() { - mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts")) - } - - @After - fun stopNodes() { - mockNet.stopNodes() - } - - /** Failure mode is the redundant replica gets stuck in startup, so we can't dispose it cleanly at the end. */ - @Test - fun `all replicas start even if there is a new consensus during startup`() { - val clusterSize = minClusterSize(1) - val (notary, node) = startBftClusterAndNode(clusterSize, mockNet, exposeRaces = true) // This true adds a sleep to expose the race. - val f = node.run { - val trivialTx = signInitialTransaction(notary) { - addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) - } - // Create a new consensus while the redundant replica is sleeping: - services.startFlow(NotaryFlow.Client(trivialTx)).resultFuture - } - mockNet.runNetwork() - f.getOrThrow() - } - - private fun TestStartedNode.signInitialTransaction(notary: Party, block: TransactionBuilder.() -> Any?): SignedTransaction { - return services.signInitialTransaction( - TransactionBuilder(notary).apply { - addCommand(dummyCommand(services.myInfo.singleIdentity().owningKey)) - block() - } - ) - } -} From f94abf726b83ba18baa827173c07b6e05496188f Mon Sep 17 00:00:00 2001 From: Michele Sollecito Date: Tue, 21 Aug 2018 21:28:13 +0100 Subject: [PATCH 2/6] Fixing exception messages not showing up in the console because of log4j2.xml setup. (#3830) --- node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index f5b2ad7a87..e35ea0db93 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -132,7 +132,7 @@ open class NodeStartup(val args: Array) { private fun Exception.logAsExpected(message: String? = this.message, print: (String?) -> Unit = logger::error) = print("$message [errorCode=${errorCode()}]") - private fun Exception.logAsUnexpected(message: String? = this.message, error: Exception = this, print: (String?, Throwable) -> Unit = logger::error) = print("$message [errorCode=${errorCode()}]", error) + private fun Exception.logAsUnexpected(message: String? = this.message, error: Exception = this, print: (String?, Throwable) -> Unit = logger::error) = print("$message${this.message?.let { ": $it" } ?: ""} [errorCode=${errorCode()}]", error) private fun Exception.isOpenJdkKnownIssue() = message?.startsWith("Unknown named curve:") == true From 96d645c316d0abac4cbe1c15113c7d73a596e75e Mon Sep 17 00:00:00 2001 From: Matthew Layton Date: Tue, 10 Jul 2018 10:24:59 +0100 Subject: [PATCH 3/6] series0ne/corda-demobench-node-config-fix Fixes an issue where profiles don't load because the node.conf format has changed between V1 and V3 --- .../net/corda/demobench/explorer/Explorer.kt | 2 +- .../corda/demobench/model/InstallFactory.kt | 2 +- .../net/corda/demobench/model/NodeConfig.kt | 11 +- .../corda/demobench/model/NodeController.kt | 8 +- .../corda/demobench/model/NodeRpcSettings.kt | 8 ++ .../kotlin/net/corda/demobench/rpc/NodeRPC.kt | 2 +- .../corda/demobench/model/NodeConfigTest.kt | 111 +++++++++--------- .../demobench/model/NodeControllerTest.kt | 6 +- 8 files changed, 83 insertions(+), 67 deletions(-) create mode 100644 tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeRpcSettings.kt diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt index eb28552c88..4606621280 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/explorer/Explorer.kt @@ -43,7 +43,7 @@ class Explorer internal constructor(private val explorerController: ExplorerCont val user = config.nodeConfig.rpcUsers[0] val p = explorerController.process( "--host=localhost", - "--port=${config.nodeConfig.rpcAddress.port}", + "--port=${config.nodeConfig.rpcSettings.address.port}", "--username=${user.username}", "--password=${user.password}") .directory(explorerDir.toFile()) diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt index 86f987c33a..68c5e0a42b 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/InstallFactory.kt @@ -22,7 +22,7 @@ class InstallFactory : Controller() { val nodeConfig = config.parseAs(UnknownConfigKeysPolicy.IGNORE::handle) nodeConfig.p2pAddress.checkPort() - nodeConfig.rpcAddress.checkPort() + nodeConfig.rpcSettings.address.checkPort() nodeConfig.webAddress.checkPort() val tempDir = Files.createTempDirectory(baseDir, ".node") diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt index 0270d12cfb..9f63d8f38b 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeConfig.kt @@ -20,8 +20,7 @@ import java.util.Properties data class NodeConfig( val myLegalName: CordaX500Name, val p2pAddress: NetworkHostAndPort, - val rpcAddress: NetworkHostAndPort, - val rpcAdminAddress: NetworkHostAndPort, + val rpcSettings: NodeRpcSettings, /** This is not used by the node but by the webserver which looks at node.conf. */ val webAddress: NetworkHostAndPort, val notary: NotaryService?, @@ -44,8 +43,8 @@ data class NodeConfig( fun nodeConf(): Config { val rpcSettings: ConfigObject = empty() - .withValue("address", valueFor(rpcAddress.toString())) - .withValue("adminAddress", valueFor(rpcAdminAddress.toString())) + .withValue("address", valueFor(rpcSettings.address.toString())) + .withValue("adminAddress", valueFor(rpcSettings.adminAddress.toString())) .root() val customMap: Map = HashMap().also { if (issuableCurrencies.isNotEmpty()) { @@ -53,7 +52,7 @@ data class NodeConfig( } } val custom: ConfigObject = ConfigFactory.parseMap(customMap).root() - return NodeConfigurationData(myLegalName, p2pAddress, rpcAddress, notary, h2port, rpcUsers, useTestClock, detectPublicIp, devMode) + return NodeConfigurationData(myLegalName, p2pAddress, this.rpcSettings.address, notary, h2port, rpcUsers, useTestClock, detectPublicIp, devMode) .toConfig() .withoutPath("rpcAddress") .withoutPath("rpcAdminAddress") @@ -61,7 +60,7 @@ data class NodeConfig( .withOptionalValue("custom", custom) } - fun webServerConf() = WebServerConfigurationData(myLegalName, rpcAddress, webAddress, rpcUsers).asConfig() + fun webServerConf() = WebServerConfigurationData(myLegalName, rpcSettings.address, webAddress, rpcUsers).asConfig() fun toNodeConfText() = nodeConf().render() diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt index 7ffff3cfbb..aa25cc163c 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeController.kt @@ -70,8 +70,10 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { country = location.countryCode ), p2pAddress = nodeData.p2pPort.toLocalAddress(), - rpcAddress = nodeData.rpcPort.toLocalAddress(), - rpcAdminAddress = nodeData.rpcAdminPort.toLocalAddress(), + rpcSettings = NodeRpcSettings( + address = nodeData.rpcPort.toLocalAddress(), + adminAddress = nodeData.rpcAdminPort.toLocalAddress() + ), webAddress = nodeData.webPort.toLocalAddress(), notary = notary, h2port = nodeData.h2Port.value, @@ -202,7 +204,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() { } private fun updatePort(config: NodeConfig) { - val nextPort = 1 + arrayOf(config.p2pAddress.port, config.rpcAddress.port, config.webAddress.port, config.h2port).max() as Int + val nextPort = 1 + arrayOf(config.p2pAddress.port, config.rpcSettings.address.port, config.webAddress.port, config.h2port).max() as Int port.getAndUpdate { Math.max(nextPort, it) } } diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeRpcSettings.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeRpcSettings.kt new file mode 100644 index 0000000000..04d093a318 --- /dev/null +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/model/NodeRpcSettings.kt @@ -0,0 +1,8 @@ +package net.corda.demobench.model + +import net.corda.core.utilities.NetworkHostAndPort + +data class NodeRpcSettings( + val address: NetworkHostAndPort, + val adminAddress: NetworkHostAndPort +) \ No newline at end of file diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt index 9300fc9643..c762a1e132 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt @@ -15,7 +15,7 @@ class NodeRPC(config: NodeConfigWrapper, start: (NodeConfigWrapper, CordaRPCOps) private val oneSecond = SECONDS.toMillis(1) } - private val rpcClient = CordaRPCClient(NetworkHostAndPort("localhost", config.nodeConfig.rpcAddress.port)) + private val rpcClient = CordaRPCClient(NetworkHostAndPort("localhost", config.nodeConfig.rpcSettings.address.port)) @Volatile private var rpcConnection: CordaRPCConnection? = null private val timer = Timer("DemoBench NodeRPC (${config.key})", true) diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt index d85acad092..d044c63f14 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeConfigTest.kt @@ -10,7 +10,10 @@ import net.corda.webserver.WebServerConfig import org.junit.Test import java.nio.file.Path import java.nio.file.Paths -import kotlin.test.* +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue class NodeConfigTest { companion object { @@ -21,21 +24,21 @@ class NodeConfigTest { @Test fun `reading node configuration`() { val config = createConfig( - legalName = myLegalName, - p2pPort = 10001, - rpcPort = 40002, - rpcAdminPort = 40005, - webPort = 20001, - h2port = 30001, - notary = NotaryService(validating = false), - users = listOf(user("jenny")) + legalName = myLegalName, + p2pPort = 10001, + rpcPort = 40002, + rpcAdminPort = 40005, + webPort = 20001, + h2port = 30001, + notary = NotaryService(validating = false), + users = listOf(user("jenny")) ) val nodeConfig = config.nodeConf() - .withValue("baseDirectory", valueFor(baseDir.toString())) - .withFallback(ConfigFactory.parseResources("reference.conf")) - .withFallback(ConfigFactory.parseMap(mapOf("devMode" to true))) - .resolve() + .withValue("baseDirectory", valueFor(baseDir.toString())) + .withFallback(ConfigFactory.parseResources("reference.conf")) + .withFallback(ConfigFactory.parseMap(mapOf("devMode" to true))) + .resolve() val fullConfig = nodeConfig.parseAsNodeConfiguration() // No custom configuration is created by default. @@ -52,20 +55,20 @@ class NodeConfigTest { @Test fun `reading node configuration with currencies`() { val config = createConfig( - legalName = myLegalName, - p2pPort = 10001, - rpcPort = 10002, - rpcAdminPort = 10003, - webPort = 10004, - h2port = 10005, - notary = NotaryService(validating = false), - issuableCurrencies = listOf("GBP") + legalName = myLegalName, + p2pPort = 10001, + rpcPort = 10002, + rpcAdminPort = 10003, + webPort = 10004, + h2port = 10005, + notary = NotaryService(validating = false), + issuableCurrencies = listOf("GBP") ) val nodeConfig = config.nodeConf() - .withValue("baseDirectory", valueFor(baseDir.toString())) - .withFallback(ConfigFactory.parseResources("reference.conf")) - .resolve() + .withValue("baseDirectory", valueFor(baseDir.toString())) + .withFallback(ConfigFactory.parseResources("reference.conf")) + .resolve() val custom = nodeConfig.getConfig("custom") assertEquals(listOf("GBP"), custom.getAnyRefList("issuableCurrencies")) } @@ -73,20 +76,20 @@ class NodeConfigTest { @Test fun `reading webserver configuration`() { val config = createConfig( - legalName = myLegalName, - p2pPort = 10001, - rpcPort = 40002, - rpcAdminPort = 40003, - webPort = 20001, - h2port = 30001, - notary = NotaryService(validating = false), - users = listOf(user("jenny")) + legalName = myLegalName, + p2pPort = 10001, + rpcPort = 40002, + rpcAdminPort = 40003, + webPort = 20001, + h2port = 30001, + notary = NotaryService(validating = false), + users = listOf(user("jenny")) ) val nodeConfig = config.webServerConf() - .withValue("baseDirectory", valueFor(baseDir.toString())) - .withFallback(ConfigFactory.parseResources("web-reference.conf")) - .resolve() + .withValue("baseDirectory", valueFor(baseDir.toString())) + .withFallback(ConfigFactory.parseResources("web-reference.conf")) + .resolve() val webConfig = WebServerConfig(baseDir, nodeConfig) // No custom configuration is created by default. @@ -99,26 +102,28 @@ class NodeConfigTest { } private fun createConfig( - legalName: CordaX500Name = CordaX500Name(organisation = "Unknown", locality = "Nowhere", country = "GB"), - p2pPort: Int = -1, - rpcPort: Int = -1, - rpcAdminPort: Int = -1, - webPort: Int = -1, - h2port: Int = -1, - notary: NotaryService?, - users: List = listOf(user("guest")), - issuableCurrencies: List = emptyList() + legalName: CordaX500Name = CordaX500Name(organisation = "Unknown", locality = "Nowhere", country = "GB"), + p2pPort: Int = -1, + rpcPort: Int = -1, + rpcAdminPort: Int = -1, + webPort: Int = -1, + h2port: Int = -1, + notary: NotaryService?, + users: List = listOf(user("guest")), + issuableCurrencies: List = emptyList() ): NodeConfig { return NodeConfig( - myLegalName = legalName, - p2pAddress = localPort(p2pPort), - rpcAddress = localPort(rpcPort), - rpcAdminAddress = localPort(rpcAdminPort), - webAddress = localPort(webPort), - h2port = h2port, - notary = notary, - rpcUsers = users, - issuableCurrencies = issuableCurrencies + myLegalName = legalName, + p2pAddress = localPort(p2pPort), + rpcSettings = NodeRpcSettings( + address = localPort(rpcPort), + adminAddress = localPort(rpcAdminPort) + ), + webAddress = localPort(webPort), + h2port = h2port, + notary = notary, + rpcUsers = users, + issuableCurrencies = issuableCurrencies ) } diff --git a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt index e6b5454144..46bbde3dfb 100644 --- a/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt +++ b/tools/demobench/src/test/kotlin/net/corda/demobench/model/NodeControllerTest.kt @@ -163,8 +163,10 @@ class NodeControllerTest { country = "US" ), p2pAddress = localPort(p2pPort), - rpcAddress = localPort(rpcPort), - rpcAdminAddress = localPort(rpcAdminPort), + rpcSettings = NodeRpcSettings( + address = localPort(rpcPort), + adminAddress = localPort(rpcAdminPort) + ), webAddress = localPort(webPort), h2port = h2port, notary = notary, From 1d05c16942a1f5815bc430232d4cc5d83e683ab8 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Wed, 22 Aug 2018 10:37:18 +0100 Subject: [PATCH 4/6] ENT-2439 Fix compression in serialization (#3825) * ENT-2439 Fix compression in serialization --- .../core/serialization/SerializationAPI.kt | 5 ++ .../node/serialization/kryo/KryoTests.kt | 14 +++++- .../internal/SerializationFormat.kt | 21 +++++++- .../internal/SerializationScheme.kt | 1 + .../internal/amqp/DeserializationInput.kt | 13 +++-- .../internal/amqp/SerializationOutput.kt | 7 ++- .../internal/ListsSerializationTest.kt | 4 +- .../internal/amqp/SerializationOutputTests.kt | 49 ++++++++++++------- .../internal/amqp/testutils/AMQPTestUtils.kt | 5 +- .../net/corda/blobinspector/BlobInspector.kt | 2 +- 10 files changed, 83 insertions(+), 38 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt index c1332c8954..c5df7f7069 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt @@ -198,6 +198,11 @@ interface SerializationContext { */ fun withEncoding(encoding: SerializationEncoding?): SerializationContext + /** + * A shallow copy of this context but with the given encoding whitelist. + */ + fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist): SerializationContext + /** * The use case that we are serializing for, since it influences the implementations chosen. */ diff --git a/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt b/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt index 4570ea1495..b15374b7ac 100644 --- a/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt +++ b/node/src/test/kotlin/net/corda/node/serialization/kryo/KryoTests.kt @@ -44,7 +44,6 @@ class TestScheme : AbstractKryoSerializationScheme() { override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - } @RunWith(Parameterized::class) @@ -89,7 +88,6 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null)) } - @Test fun `serialised form is stable when the same object instance is added to the deserialised object graph`() { val noReferencesContext = context.withoutReferences() @@ -356,4 +354,16 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { assertEquals(encodingNotPermittedFormat.format(compression), message) } } + + @Test + fun `compression reduces number of bytes significantly`() { + class Holder(val holder: ByteArray) + + val obj = Holder(ByteArray(20000)) + val uncompressedSize = obj.serialize(factory, context.withEncoding(null)).size + val compressedSize = obj.serialize(factory, context.withEncoding(CordaSerializationEncoding.SNAPPY)).size + // If these need fixing, sounds like Kryo wire format changed and checkpoints might not surive an upgrade. + assertEquals(20222, uncompressedSize) + assertEquals(1111, compressedSize) + } } \ No newline at end of file diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt index d275643fc3..7eb236f23d 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationFormat.kt @@ -7,6 +7,7 @@ import net.corda.core.utilities.OpaqueBytes import net.corda.serialization.internal.OrdinalBits.OrdinalWriter import org.iq80.snappy.SnappyFramedInputStream import org.iq80.snappy.SnappyFramedOutputStream +import java.io.IOException import java.io.InputStream import java.io.OutputStream import java.nio.ByteBuffer @@ -44,7 +45,7 @@ enum class CordaSerializationEncoding : SerializationEncoding, OrdinalWriter { override fun wrap(stream: InputStream) = InflaterInputStream(stream) }, SNAPPY { - override fun wrap(stream: OutputStream) = SnappyFramedOutputStream(stream) + override fun wrap(stream: OutputStream) = FlushAverseOutputStream(SnappyFramedOutputStream(stream)) override fun wrap(stream: InputStream) = SnappyFramedInputStream(stream, false) }; @@ -58,3 +59,21 @@ enum class CordaSerializationEncoding : SerializationEncoding, OrdinalWriter { } const val encodingNotPermittedFormat = "Encoding not permitted: %s" + +/** + * Has an empty flush implementation. This is because Kryo keeps calling flush all the time, which stops the Snappy + * stream from building up big chunks to compress and instead keeps compressing small chunks giving terrible compression ratio. + */ +class FlushAverseOutputStream(private val delegate: OutputStream) : OutputStream() { + @Throws(IOException::class) + override fun write(b: Int) = delegate.write(b) + + @Throws(IOException::class) + override fun write(b: ByteArray?, off: Int, len: Int) = delegate.write(b, off, len) + + @Throws(IOException::class) + override fun close() { + delegate.flush() + delegate.close() + } +} diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt index c3f9f1e31e..9c78e01926 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/SerializationScheme.kt @@ -67,6 +67,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe override fun withPreferredSerializationVersion(magic: SerializationMagic) = copy(preferredSerializationVersion = magic) override fun withEncoding(encoding: SerializationEncoding?) = copy(encoding = encoding) + override fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist) = copy(encodingWhitelist = encodingWhitelist) } /* diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt index cfa2065c4b..126400f31d 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt @@ -29,9 +29,8 @@ data class ObjectAndEnvelope(val obj: T, val envelope: Envelope) * instances and threads. */ @KeepForDJVM -class DeserializationInput @JvmOverloads constructor( - private val serializerFactory: SerializerFactory, - private val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist +class DeserializationInput constructor( + private val serializerFactory: SerializerFactory ) { private val objectHistory: MutableList = mutableListOf() private val logger = loggerFor() @@ -80,9 +79,9 @@ class DeserializationInput @JvmOverloads constructor( } } - + @VisibleForTesting @Throws(AMQPNoTypeNotSerializableException::class) - fun getEnvelope(byteSequence: ByteSequence) = getEnvelope(byteSequence, encodingWhitelist) + fun getEnvelope(byteSequence: ByteSequence, context: SerializationContext) = getEnvelope(byteSequence, context.encodingWhitelist) @Throws( AMQPNotSerializableException::class, @@ -116,7 +115,7 @@ class DeserializationInput @JvmOverloads constructor( @Throws(NotSerializableException::class) fun deserialize(bytes: ByteSequence, clazz: Class, context: SerializationContext): T = des { - val envelope = getEnvelope(bytes, encodingWhitelist) + val envelope = getEnvelope(bytes, context.encodingWhitelist) logger.trace("deserialize blob scheme=\"${envelope.schema.toString()}\"") @@ -130,7 +129,7 @@ class DeserializationInput @JvmOverloads constructor( clazz: Class, context: SerializationContext ): ObjectAndEnvelope = des { - val envelope = getEnvelope(bytes, encodingWhitelist) + val envelope = getEnvelope(bytes, context.encodingWhitelist) // Now pick out the obj and schema from the envelope. ObjectAndEnvelope( clazz.cast(readObjectOrNull( diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt index 080fdbc8a1..d24e5ea77b 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializationOutput.kt @@ -2,7 +2,6 @@ package net.corda.serialization.internal.amqp import net.corda.core.KeepForDJVM import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationEncoding import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.contextLogger import net.corda.serialization.internal.CordaSerializationEncoding @@ -28,9 +27,8 @@ data class BytesAndSchemas( * instances and threads. */ @KeepForDJVM -open class SerializationOutput @JvmOverloads constructor( - internal val serializerFactory: SerializerFactory, - private val encoding: SerializationEncoding? = null +open class SerializationOutput constructor( + internal val serializerFactory: SerializerFactory ) { companion object { private val logger = contextLogger() @@ -90,6 +88,7 @@ open class SerializationOutput @JvmOverloads constructor( var stream: OutputStream = it try { amqpMagic.writeTo(stream) + val encoding = context.encoding if (encoding != null) { SectionId.ENCODING.writeTo(stream) (encoding as CordaSerializationEncoding).writeTo(stream) diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt index 97ccea49a7..60f71520fe 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/ListsSerializationTest.kt @@ -8,9 +8,9 @@ import net.corda.node.services.statemachine.DataSessionMessage import net.corda.serialization.internal.amqp.DeserializationInput import net.corda.serialization.internal.amqp.Envelope import net.corda.serialization.internal.amqp.SerializerFactory +import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.internal.amqpSpecific import net.corda.testing.internal.kryoSpecific -import net.corda.testing.core.SerializationEnvironmentRule import org.assertj.core.api.Assertions import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertEquals @@ -28,7 +28,7 @@ class ListsSerializationTest { fun verifyEnvelope(serBytes: SerializedBytes, envVerBody: (Envelope) -> Unit) = amqpSpecific("AMQP specific envelope verification") { val context = SerializationFactory.defaultFactory.defaultContext - val envelope = DeserializationInput(SerializerFactory(context.whitelist, context.deserializationClassLoader)).getEnvelope(serBytes) + val envelope = DeserializationInput(SerializerFactory(context.whitelist, context.deserializationClassLoader)).getEnvelope(serBytes, context) envVerBody(envelope) } } diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt index c0ffdfc04a..37bf6e8b47 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt @@ -219,8 +219,8 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi freshDeserializationFactory: SerializerFactory = defaultFactory(), expectedEqual: Boolean = true, expectDeserializedEqual: Boolean = true): T { - val ser = SerializationOutput(factory, compression) - val bytes = ser.serialize(obj) + val ser = SerializationOutput(factory) + val bytes = ser.serialize(obj, compression) val decoder = DecoderImpl().apply { this.register(Envelope.DESCRIPTOR, Envelope.Companion) @@ -241,14 +241,14 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi val result = decoder.readObject() as Envelope assertNotNull(result) } - val des = DeserializationInput(freshDeserializationFactory, encodingWhitelist) - val desObj = des.deserialize(bytes) + val des = DeserializationInput(freshDeserializationFactory) + val desObj = des.deserialize(bytes, testSerializationContext.withEncodingWhitelist(encodingWhitelist)) assertTrue(Objects.deepEquals(obj, desObj) == expectedEqual) // Now repeat with a re-used factory - val ser2 = SerializationOutput(factory, compression) - val des2 = DeserializationInput(factory, encodingWhitelist) - val desObj2 = des2.deserialize(ser2.serialize(obj)) + val ser2 = SerializationOutput(factory) + val des2 = DeserializationInput(factory) + val desObj2 = des2.deserialize(ser2.serialize(obj, compression), testSerializationContext.withEncodingWhitelist(encodingWhitelist)) assertTrue(Objects.deepEquals(obj, desObj2) == expectedEqual) assertTrue(Objects.deepEquals(desObj, desObj2) == expectDeserializedEqual) @@ -471,10 +471,10 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi @Test fun `class constructor is invoked on deserialisation`() { compression == null || return // Manipulation of serialized bytes is invalid if they're compressed. - val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader()), compression) - val des = DeserializationInput(ser.serializerFactory, encodingWhitelist) - val serialisedOne = ser.serialize(NonZeroByte(1)).bytes - val serialisedTwo = ser.serialize(NonZeroByte(2)).bytes + val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())) + val des = DeserializationInput(ser.serializerFactory) + val serialisedOne = ser.serialize(NonZeroByte(1), compression).bytes + val serialisedTwo = ser.serialize(NonZeroByte(2), compression).bytes // Find the index that holds the value byte val valueIndex = serialisedOne.zip(serialisedTwo).mapIndexedNotNull { index, (oneByte, twoByte) -> @@ -485,12 +485,12 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi // Double check copy[valueIndex] = 0x03 - assertThat(des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext).value).isEqualTo(3) + assertThat(des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext.withEncodingWhitelist(encodingWhitelist)).value).isEqualTo(3) // Now use the forbidden value copy[valueIndex] = 0x00 assertThatExceptionOfType(NotSerializableException::class.java).isThrownBy { - des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext) + des.deserialize(OpaqueBytes(copy), NonZeroByte::class.java, testSerializationContext.withEncodingWhitelist(encodingWhitelist)) }.withStackTraceContaining("Zero not allowed") } @@ -1198,7 +1198,7 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi val c = C(Amount(100, BigDecimal("1.5"), Currency.getInstance("USD"))) // were the issue not fixed we'd blow up here - SerializationOutput(factory, compression).serialize(c) + SerializationOutput(factory).serialize(c, compression) } @Test @@ -1206,9 +1206,9 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi compression ?: return val factory = defaultFactory() val data = ByteArray(12345).also { Random(0).nextBytes(it) }.let { it + it } - val compressed = SerializationOutput(factory, compression).serialize(data) + val compressed = SerializationOutput(factory).serialize(data, compression) assertEquals(.5, compressed.size.toDouble() / data.size, .03) - assertArrayEquals(data, DeserializationInput(factory, encodingWhitelist).deserialize(compressed)) + assertArrayEquals(data, DeserializationInput(factory).deserialize(compressed, testSerializationContext.withEncodingWhitelist(encodingWhitelist))) } @Test @@ -1216,9 +1216,9 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi compression ?: return val factory = defaultFactory() doReturn(false).whenever(encodingWhitelist).acceptEncoding(compression) - val compressed = SerializationOutput(factory, compression).serialize("whatever") - val input = DeserializationInput(factory, encodingWhitelist) - catchThrowable { input.deserialize(compressed) }.run { + val compressed = SerializationOutput(factory).serialize("whatever", compression) + val input = DeserializationInput(factory) + catchThrowable { input.deserialize(compressed, testSerializationContext.withEncodingWhitelist(encodingWhitelist)) }.run { assertSame(NotSerializableException::class.java, javaClass) assertEquals(encodingNotPermittedFormat.format(compression), message) } @@ -1348,5 +1348,16 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi throw Error("Deserializing serialized \$C should not throw") } } + + @Test + fun `compression reduces number of bytes significantly`() { + val ser = SerializationOutput(SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())) + val obj = ByteArray(20000) + val uncompressedSize = ser.serialize(obj).bytes.size + val compressedSize = ser.serialize(obj, CordaSerializationEncoding.SNAPPY).bytes.size + // Ordinarily this might be considered high maintenance, but we promised wire compatibility, so they'd better not change! + assertEquals(20059, uncompressedSize) + assertEquals(1018, compressedSize) + } } diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt index 9f300fd9e8..e3820e2d6b 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt @@ -4,6 +4,7 @@ import net.corda.core.internal.copyTo import net.corda.core.internal.div import net.corda.core.internal.packageName import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationEncoding import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.OpaqueBytes import net.corda.serialization.internal.AllWhitelist @@ -98,9 +99,9 @@ fun SerializationOutput.serializeAndReturnSchema( @Throws(NotSerializableException::class) -fun SerializationOutput.serialize(obj: T): SerializedBytes { +fun SerializationOutput.serialize(obj: T, encoding: SerializationEncoding? = null): SerializedBytes { try { - return _serialize(obj, testSerializationContext) + return _serialize(obj, testSerializationContext.withEncoding(encoding)) } finally { andFinally() } diff --git a/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt b/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt index 94e1a45747..655ddb830c 100644 --- a/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt +++ b/tools/blobinspector/src/main/kotlin/net/corda/blobinspector/BlobInspector.kt @@ -83,7 +83,7 @@ class BlobInspector : Runnable { ?: throw IllegalArgumentException("Error: this input does not appear to be encoded in Corda's AMQP extended format, sorry.") if (schema) { - val envelope = DeserializationInput.getEnvelope(bytes.sequence()) + val envelope = DeserializationInput.getEnvelope(bytes.sequence(), SerializationDefaults.STORAGE_CONTEXT.encodingWhitelist) out.println(envelope.schema) out.println() out.println(envelope.transformsSchema) From 004ea45a051df0b36483bbf720482ae0cafcb967 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Wed, 22 Aug 2018 11:09:52 +0100 Subject: [PATCH 5/6] Ensure that every CheatingSecurityProvider has a unique name. (#3835) --- .../net/corda/deterministic/CheatingSecurityProvider.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt b/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt index 048c0cae68..f62c67b2c3 100644 --- a/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt +++ b/core-deterministic/testing/src/test/kotlin/net/corda/deterministic/CheatingSecurityProvider.kt @@ -5,14 +5,15 @@ import java.security.Provider import java.security.SecureRandom import java.security.SecureRandomSpi import java.security.Security +import java.util.concurrent.atomic.AtomicInteger /** * Temporarily restore Sun's [SecureRandom] provider. * This is ONLY for allowing us to generate test data, e.g. signatures. */ -class CheatingSecurityProvider : Provider(NAME, 1.8, "$NAME security provider"), AutoCloseable { +class CheatingSecurityProvider : Provider("Cheat-${counter.getAndIncrement()}", 1.8, "Cheat security provider"), AutoCloseable { private companion object { - private const val NAME = "Cheat!" + private val counter = AtomicInteger() } init { @@ -21,7 +22,7 @@ class CheatingSecurityProvider : Provider(NAME, 1.8, "$NAME security provider"), } override fun close() { - Security.removeProvider(NAME) + Security.removeProvider(name) } private class SunSecureRandom : SecureRandom(sun.security.provider.SecureRandom(), null) From 2fae95c58f97d7052d171303399e06756b0bfbce Mon Sep 17 00:00:00 2001 From: Florian Friemel Date: Wed, 22 Aug 2018 13:16:07 +0100 Subject: [PATCH 6/6] [CORDA-1917] Fix smoke test (#3823) --- .../corda/core/cordapp/CordappSmokeTest.kt | 66 +++++++++++++++++-- 1 file changed, 61 insertions(+), 5 deletions(-) diff --git a/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt b/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt index 6bc2743699..44195477d0 100644 --- a/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt +++ b/core/src/smoke-test/kotlin/net/corda/core/cordapp/CordappSmokeTest.kt @@ -1,23 +1,36 @@ package net.corda.core.cordapp import co.paralleluniverse.fibers.Suspendable +import net.corda.core.crypto.Crypto.generateKeyPair +import net.corda.core.crypto.sign import net.corda.core.flows.* import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party -import net.corda.core.internal.copyToDirectory -import net.corda.core.internal.createDirectories -import net.corda.core.internal.div -import net.corda.core.internal.list +import net.corda.core.identity.PartyAndCertificate +import net.corda.core.internal.* import net.corda.core.messaging.startFlow +import net.corda.core.node.NodeInfo +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap +import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA +import net.corda.nodeapi.internal.DEV_ROOT_CA +import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.config.User +import net.corda.nodeapi.internal.createDevNodeCa +import net.corda.nodeapi.internal.crypto.CertificateType +import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.smoketesting.NodeConfig import net.corda.smoketesting.NodeProcess import net.corda.smoketesting.NodeProcess.Companion.CORDAPPS_DIR_NAME import org.assertj.core.api.Assertions.assertThat import org.junit.Test +import java.nio.file.Path import java.nio.file.Paths +import java.security.KeyPair +import java.security.PrivateKey +import java.security.PublicKey import java.util.concurrent.atomic.AtomicInteger import kotlin.streams.toList @@ -40,13 +53,21 @@ class CordappSmokeTest { @Test fun `FlowContent appName returns the filename of the CorDapp jar`() { - val cordappsDir = (factory.baseDirectory(aliceConfig) / CORDAPPS_DIR_NAME).createDirectories() + val baseDir = factory.baseDirectory(aliceConfig) + val cordappsDir = (baseDir / CORDAPPS_DIR_NAME).createDirectories() // Find the jar file for the smoke tests of this module val selfCordapp = Paths.get("build", "libs").list { it.filter { "-smokeTests" in it.toString() }.toList().single() } selfCordapp.copyToDirectory(cordappsDir) + // The `nodeReadyFuture` in the persistent network map cache will not complete unless there is at least one other + // node in the network. We work around this limitation by putting another node info file in the additional-node-info + // folder. + // TODO clean this up after we refactor the persistent network map cache / network map updater + val additionalNodeInfoDir = (baseDir / "additional-node-infos").createDirectories() + createDummyNodeInfo(additionalNodeInfoDir) + factory.create(aliceConfig).use { alice -> alice.connect().use { connectionToAlice -> val aliceIdentity = connectionToAlice.proxy.nodeInfo().legalIdentitiesAndCerts.first().party @@ -90,4 +111,39 @@ class CordappSmokeTest { otherPartySession.send(sessionInitContext) } } + + private fun createDummyNodeInfo(additionalNodeInfoDir: Path) { + val dummyKeyPair = generateKeyPair() + val nodeInfo = createNodeInfoWithSingleIdentity(CordaX500Name(organisation = "Bob Corp", locality = "Madrid", country = "ES"), dummyKeyPair, dummyKeyPair.public) + val signedNodeInfo = signWith(nodeInfo, listOf(dummyKeyPair.private)) + (additionalNodeInfoDir / "nodeInfo-41408E093F95EAD51F6892C34DEB65AE1A3569A4B0E5744769A1B485AF8E04B5").write(signedNodeInfo.serialize().bytes) + } + + private fun createNodeInfoWithSingleIdentity(name: CordaX500Name, nodeKeyPair: KeyPair, identityCertPublicKey: PublicKey): NodeInfo { + val nodeCertificateAndKeyPair = createDevNodeCa(DEV_INTERMEDIATE_CA, name, nodeKeyPair) + val identityCert = X509Utilities.createCertificate( + CertificateType.LEGAL_IDENTITY, + nodeCertificateAndKeyPair.certificate, + nodeCertificateAndKeyPair.keyPair, + nodeCertificateAndKeyPair.certificate.subjectX500Principal, + identityCertPublicKey) + val certPath = X509Utilities.buildCertPath( + identityCert, + nodeCertificateAndKeyPair.certificate, + DEV_INTERMEDIATE_CA.certificate, + DEV_ROOT_CA.certificate) + val partyAndCertificate = PartyAndCertificate(certPath) + return NodeInfo( + listOf(NetworkHostAndPort("my.${partyAndCertificate.party.name.organisation}.com", 1234)), + listOf(partyAndCertificate), + 1, + 1 + ) + } + + private fun signWith(nodeInfo: NodeInfo, keys: List): SignedNodeInfo { + val serialized = nodeInfo.serialize() + val signatures = keys.map { it.sign(serialized.bytes) } + return SignedNodeInfo(serialized, signatures) + } }