From a33309a31bd2fc93a936eb060335fda66839dabf Mon Sep 17 00:00:00 2001 From: Adel El-Beik <48713346+adelel1@users.noreply.github.com> Date: Wed, 27 May 2020 11:35:15 +0100 Subject: [PATCH 1/5] CORDA-3755: Backport AttachmentURLStreamHandlerFactory memory leak (#6274) * CORDA-3755: Switched attachments map to a WeakHashMap (#6214) * CORDA-3772: Now specify source and target of 8 when compiling contract classes. * CORDA-3651: addManifest now uses separate files for reading and writing. (#6026) * CORDA-3651: addManifest now uses separate files for reading and writing. * CORDA-3651: The jar scanning loader now closes itsself. Co-authored-by: Adel El-Beik Co-authored-by: Adel El-Beik --- core-deterministic/build.gradle | 1 + .../internal/AttachmentsHolderImpl.kt | 23 ++++ .../AttachmentsClassLoaderTests.kt | 126 +++++++++++++++++- .../internal/AttachmentsClassLoader.kt | 79 ++++++++--- .../core/internal/ContractJarTestUtils.kt | 12 +- .../core/internal/JarSignatureTestUtils.kt | 5 +- 6 files changed, 218 insertions(+), 28 deletions(-) create mode 100644 core-deterministic/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsHolderImpl.kt diff --git a/core-deterministic/build.gradle b/core-deterministic/build.gradle index 4b84f32d7e..5d7d1cd971 100644 --- a/core-deterministic/build.gradle +++ b/core-deterministic/build.gradle @@ -59,6 +59,7 @@ task patchCore(type: Zip, dependsOn: coreJarTask) { from(zipTree(originalJar)) { exclude 'net/corda/core/internal/*ToggleField*.class' exclude 'net/corda/core/serialization/*SerializationFactory*.class' + exclude 'net/corda/core/serialization/internal/AttachmentsHolderImpl.class' exclude 'net/corda/core/serialization/internal/CheckpointSerializationFactory*.class' exclude 'net/corda/core/internal/rules/*.class' } diff --git a/core-deterministic/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsHolderImpl.kt b/core-deterministic/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsHolderImpl.kt new file mode 100644 index 0000000000..a2f7b8ab30 --- /dev/null +++ b/core-deterministic/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsHolderImpl.kt @@ -0,0 +1,23 @@ +package net.corda.core.serialization.internal + +import net.corda.core.contracts.Attachment +import java.net.URL + +@Suppress("unused") +private class AttachmentsHolderImpl : AttachmentsHolder { + private val attachments = LinkedHashMap>() + + override val size: Int get() = attachments.size + + override fun getKey(key: URL): URL? { + return attachments[key]?.first + } + + override fun get(key: URL): Attachment? { + return attachments[key]?.second + } + + override fun set(key: URL, value: Attachment) { + attachments[key] = key to value + } +} diff --git a/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt index 9007a7e28d..3bcc21dd9a 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/transactions/AttachmentsClassLoaderTests.kt @@ -1,19 +1,37 @@ package net.corda.coretests.transactions +import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint import net.corda.core.contracts.Attachment +import net.corda.core.contracts.CommandData +import net.corda.core.contracts.CommandWithParties import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractAttachment +import net.corda.core.contracts.PrivacySalt +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.contracts.TransactionState import net.corda.core.contracts.TransactionVerificationException import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash +import net.corda.core.identity.Party +import net.corda.core.internal.AbstractAttachment import net.corda.core.internal.AttachmentTrustCalculator +import net.corda.core.internal.createLedgerTransaction import net.corda.core.internal.declaredField import net.corda.core.internal.hash import net.corda.core.internal.inputStream import net.corda.core.node.NetworkParameters import net.corda.core.node.services.AttachmentId import net.corda.core.serialization.internal.AttachmentsClassLoader -import net.corda.node.services.attachments.NodeAttachmentTrustCalculator import net.corda.testing.common.internal.testNetworkParameters +import net.corda.node.services.attachments.NodeAttachmentTrustCalculator +import net.corda.testing.contracts.DummyContract +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.TestIdentity +import net.corda.testing.core.internal.ContractJarTestUtils import net.corda.testing.core.internal.ContractJarTestUtils.signContractJar import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.fakeAttachment @@ -24,10 +42,14 @@ import org.apache.commons.io.IOUtils import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertEquals import org.junit.Before +import org.junit.Rule import org.junit.Test +import org.junit.rules.TemporaryFolder import java.io.ByteArrayOutputStream import java.io.InputStream import java.net.URL +import java.nio.file.Path +import java.security.PublicKey import kotlin.test.assertFailsWith class AttachmentsClassLoaderTests { @@ -43,8 +65,21 @@ class AttachmentsClassLoaderTests { it.toByteArray() } } + val ALICE = TestIdentity(ALICE_NAME, 70).party + val BOB = TestIdentity(BOB_NAME, 80).party + val dummyNotary = TestIdentity(DUMMY_NOTARY_NAME, 20) + val DUMMY_NOTARY get() = dummyNotary.party + val PROGRAM_ID: String = "net.corda.testing.contracts.MyDummyContract" } + @Rule + @JvmField + val tempFolder = TemporaryFolder() + + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule(true) + private lateinit var storage: MockAttachmentStorage private lateinit var internalStorage: InternalMockAttachmentStorage private lateinit var attachmentTrustCalculator: AttachmentTrustCalculator @@ -448,4 +483,93 @@ class AttachmentsClassLoaderTests { createClassloader(trustedAttachment) } + + @Test(timeout=300_000) + fun `attachment still available in verify after forced gc in verify`() { + tempFolder.root.toPath().let { path -> + val baseOutState = TransactionState(DummyContract.SingleOwnerState(0, ALICE), PROGRAM_ID, DUMMY_NOTARY, constraint = AlwaysAcceptAttachmentConstraint) + val inputs = emptyList>() + val outputs = listOf(baseOutState, baseOutState.copy(notary = ALICE), baseOutState.copy(notary = BOB)) + val commands = emptyList>() + + val content = createContractString(PROGRAM_ID) + val contractJarPath = ContractJarTestUtils.makeTestContractJar(path, PROGRAM_ID, content = content) + + val attachments = createAttachments(contractJarPath) + + val id = SecureHash.randomSHA256() + val timeWindow: TimeWindow? = null + val privacySalt = PrivacySalt() + val transaction = createLedgerTransaction( + inputs, + outputs, + commands, + attachments, + id, + null, + timeWindow, + privacySalt, + testNetworkParameters(), + emptyList(), + isAttachmentTrusted = { true } + ) + transaction.verify() + } + } + + private fun createContractString(contractName: String, versionSeed: Int = 0): String { + val pkgs = contractName.split(".") + val className = pkgs.last() + val packages = pkgs.subList(0, pkgs.size - 1) + + val output = """package ${packages.joinToString(".")}; + import net.corda.core.contracts.*; + import net.corda.core.transactions.*; + import java.net.URL; + import java.io.InputStream; + + public class $className implements Contract { + private int seed = $versionSeed; + @Override + public void verify(LedgerTransaction tx) throws IllegalArgumentException { + System.gc(); + InputStream str = this.getClass().getClassLoader().getResourceAsStream("importantDoc.pdf"); + if (str == null) throw new IllegalStateException("Could not find importantDoc.pdf"); + } + } + """.trimIndent() + + System.out.println(output) + return output + } + + private fun createAttachments(contractJarPath: Path) : List { + + val attachment = object : AbstractAttachment({contractJarPath.inputStream().readBytes()}, uploader = "app") { + @Suppress("OverridingDeprecatedMember") + override val signers: List = emptyList() + override val signerKeys: List = emptyList() + override val size: Int = 1234 + override val id: SecureHash = SecureHash.sha256(attachmentData) + } + val contractAttachment = ContractAttachment(attachment, PROGRAM_ID) + + return listOf( + object : AbstractAttachment({ISOLATED_CONTRACTS_JAR_PATH.openStream().readBytes()}, uploader = "app") { + @Suppress("OverridingDeprecatedMember") + override val signers: List = emptyList() + override val signerKeys: List = emptyList() + override val size: Int = 1234 + override val id: SecureHash = SecureHash.sha256(attachmentData) + }, + object : AbstractAttachment({fakeAttachment("importantDoc.pdf", "I am a pdf!").inputStream().readBytes() + }, uploader = "app") { + @Suppress("OverridingDeprecatedMember") + override val signers: List = emptyList() + override val signerKeys: List = emptyList() + override val size: Int = 1234 + override val id: SecureHash = SecureHash.sha256(attachmentData) + }, + contractAttachment) + } } diff --git a/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt b/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt index e85e5f838f..a0f6f6c141 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/internal/AttachmentsClassLoader.kt @@ -17,6 +17,7 @@ import net.corda.core.utilities.debug import java.io.ByteArrayOutputStream import java.io.IOException import java.io.InputStream +import java.lang.ref.WeakReference import java.net.* import java.util.* import java.util.jar.JarInputStream @@ -53,14 +54,6 @@ class AttachmentsClassLoader(attachments: List, private val ignoreDirectories = listOf("org/jolokia/", "org/json/simple/") private val ignorePackages = ignoreDirectories.map { it.replace("/", ".") } - @VisibleForTesting - private fun readAttachment(attachment: Attachment, filepath: String): ByteArray { - ByteArrayOutputStream().use { - attachment.extractFile(filepath, it) - return it.toByteArray() - } - } - /** * Apply our custom factory either directly, if `URL.setURLStreamHandlerFactory` has not been called yet, * or use a decorator and reflection to bypass the single-call-per-JVM restriction otherwise. @@ -354,8 +347,7 @@ object AttachmentsClassLoaderBuilder { object AttachmentURLStreamHandlerFactory : URLStreamHandlerFactory { private const val attachmentScheme = "attachment" - // TODO - what happens if this grows too large? - private val loadedAttachments = mutableMapOf().toSynchronised() + private val loadedAttachments: AttachmentsHolder = AttachmentsHolderImpl() override fun createURLStreamHandler(protocol: String): URLStreamHandler? { return if (attachmentScheme == protocol) { @@ -363,25 +355,70 @@ object AttachmentURLStreamHandlerFactory : URLStreamHandlerFactory { } else null } + @Synchronized fun toUrl(attachment: Attachment): URL { - val id = attachment.id.toString() - loadedAttachments[id] = attachment - return URL(attachmentScheme, "", -1, id, AttachmentURLStreamHandler) + val proposedURL = URL(attachmentScheme, "", -1, attachment.id.toString(), AttachmentURLStreamHandler) + val existingURL = loadedAttachments.getKey(proposedURL) + return if (existingURL == null) { + loadedAttachments[proposedURL] = attachment + proposedURL + } else { + existingURL + } } + @VisibleForTesting + fun loadedAttachmentsSize(): Int = loadedAttachments.size + private object AttachmentURLStreamHandler : URLStreamHandler() { override fun openConnection(url: URL): URLConnection { if (url.protocol != attachmentScheme) throw IOException("Cannot handle protocol: ${url.protocol}") - val attachment = loadedAttachments[url.path] ?: throw IOException("Could not load url: $url .") + val attachment = loadedAttachments[url] ?: throw IOException("Could not load url: $url .") return AttachmentURLConnection(url, attachment) } - } - private class AttachmentURLConnection(url: URL, private val attachment: Attachment) : URLConnection(url) { - override fun getContentLengthLong(): Long = attachment.size.toLong() - override fun getInputStream(): InputStream = attachment.open() - override fun connect() { - connected = true + override fun equals(attachmentUrl: URL, otherURL: URL?): Boolean { + if (attachmentUrl.protocol != otherURL?.protocol) return false + if (attachmentUrl.protocol != attachmentScheme) throw IllegalArgumentException("Cannot handle protocol: ${attachmentUrl.protocol}") + return attachmentUrl.file == otherURL?.file + } + + override fun hashCode(url: URL): Int { + if (url.protocol != attachmentScheme) throw IllegalArgumentException("Cannot handle protocol: ${url.protocol}") + return url.file.hashCode() + } + + private class AttachmentURLConnection(url: URL, private val attachment: Attachment) : URLConnection(url) { + override fun getContentLengthLong(): Long = attachment.size.toLong() + override fun getInputStream(): InputStream = attachment.open() + override fun connect() { + connected = true + } } } -} \ No newline at end of file +} + +interface AttachmentsHolder { + val size: Int + fun getKey(key: URL): URL? + operator fun get(key: URL): Attachment? + operator fun set(key: URL, value: Attachment) +} + +private class AttachmentsHolderImpl : AttachmentsHolder { + private val attachments = WeakHashMap, Attachment>>().toSynchronised() + + override val size: Int get() = attachments.size + + override fun getKey(key: URL): URL? { + return attachments[key]?.first?.get() + } + + override fun get(key: URL): Attachment? { + return attachments[key]?.second + } + + override fun set(key: URL, value: Attachment) { + attachments[key] = WeakReference(key) to value + } +} diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/ContractJarTestUtils.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/ContractJarTestUtils.kt index cefc596c3a..3ba57fa20a 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/ContractJarTestUtils.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/ContractJarTestUtils.kt @@ -59,12 +59,14 @@ object ContractJarTestUtils { return workingDir.resolve(jarName) to signer } + @Suppress("LongParameterList") @JvmOverloads - fun makeTestContractJar(workingDir: Path, contractName: String, signed: Boolean = false, version: Int = 1, versionSeed: Int = 0): Path { + fun makeTestContractJar(workingDir: Path, contractName: String, signed: Boolean = false, version: Int = 1, versionSeed: Int = 0, + content: String? = null): Path { val packages = contractName.split(".") val jarName = "attachment-${packages.last()}-$version-$versionSeed-${(if (signed) "signed" else "")}.jar" val className = packages.last() - createTestClass(workingDir, className, packages.subList(0, packages.size - 1), versionSeed) + createTestClass(workingDir, className, packages.subList(0, packages.size - 1), versionSeed, content) workingDir.createJar(jarName, "${contractName.replace(".", "/")}.class") workingDir.addManifest(jarName, Pair(Attributes.Name(CORDAPP_CONTRACT_VERSION), version.toString())) return workingDir.resolve(jarName) @@ -87,8 +89,8 @@ object ContractJarTestUtils { return workingDir.resolve(jarName) } - private fun createTestClass(workingDir: Path, className: String, packages: List, versionSeed: Int = 0): Path { - val newClass = """package ${packages.joinToString(".")}; + private fun createTestClass(workingDir: Path, className: String, packages: List, versionSeed: Int = 0, content: String? = null): Path { + val newClass = content ?: """package ${packages.joinToString(".")}; import net.corda.core.contracts.*; import net.corda.core.transactions.*; @@ -108,7 +110,7 @@ object ContractJarTestUtils { val fileManager = compiler.getStandardFileManager(null, null, null) fileManager.setLocation(StandardLocation.CLASS_OUTPUT, listOf(workingDir.toFile())) - compiler.getTask(System.out.writer(), fileManager, null, null, null, listOf(source)).call() + compiler.getTask(System.out.writer(), fileManager, null, listOf("-source", "8", "-target", "8"), null, listOf(source)).call() val outFile = fileManager.getFileForInput(StandardLocation.CLASS_OUTPUT, packages.joinToString("."), "$className.class") return Paths.get(outFile.name) } diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/JarSignatureTestUtils.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/JarSignatureTestUtils.kt index 00a827c1e1..4c21340f84 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/JarSignatureTestUtils.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/core/internal/JarSignatureTestUtils.kt @@ -12,6 +12,7 @@ import java.nio.file.Files import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.Paths +import java.nio.file.StandardCopyOption.REPLACE_EXISTING import java.security.PublicKey import java.util.jar.Attributes import java.util.jar.JarInputStream @@ -88,12 +89,13 @@ object JarSignatureTestUtils { JarInputStream(FileInputStream((this / fileName).toFile())).use(JarSignatureCollector::collectSigners) fun Path.addManifest(fileName: String, vararg entries: Pair) { + val outputFile = this / (fileName + "Output") JarInputStream(FileInputStream((this / fileName).toFile())).use { input -> val manifest = input.manifest ?: Manifest() entries.forEach { (attributeName, value) -> manifest.mainAttributes[attributeName] = value } - val output = JarOutputStream(FileOutputStream((this / fileName).toFile()), manifest) + val output = JarOutputStream(FileOutputStream(outputFile.toFile()), manifest) var entry = input.nextEntry val buffer = ByteArray(1 shl 14) while (true) { @@ -108,5 +110,6 @@ object JarSignatureTestUtils { } output.close() } + Files.copy(outputFile, this / fileName, REPLACE_EXISTING) } } From 7ab6a8f60077db505c434a516f49cbea8ec46f9e Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Tue, 16 Jun 2020 09:22:26 +0100 Subject: [PATCH 2/5] CORDA-3841 Check `isAnyCheckpointPersisted` in `startFlowInternal` (#6351) Only hit the database if `StateMachineState.isAnyCheckpointPersisted` returns true. Otherwise, there will be no checkpoint to retrieve from the database anyway. This can prevent errors due to a transient loss of connection to the database. --- .../StatemachineGeneralErrorHandlingTest.kt | 521 ++++++++++++++++-- .../persistence/DBCheckpointStorage.kt | 1 + .../SingleThreadedStateMachineManager.kt | 12 +- .../statemachine/StaffedFlowHospital.kt | 2 + 4 files changed, 489 insertions(+), 47 deletions(-) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt index 5aacac8a4a..740ea8ca43 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt @@ -4,6 +4,7 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds +import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.transitions.TopLevelTransition import net.corda.testing.core.ALICE_NAME @@ -11,6 +12,8 @@ import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.singleIdentity import org.junit.Ignore import org.junit.Test +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.TimeoutException import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -18,6 +21,10 @@ import kotlin.test.assertFailsWith @Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { + private companion object { + val executor: ExecutorService = Executors.newSingleThreadExecutor() + } + /** * Throws an exception when performing an [Action.SendInitial] action. * The exception is thrown 4 times. @@ -25,8 +32,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * This causes the transition to be discharged from the hospital 3 times (retries 3 times) and is then kept in * the hospital for observation. */ - @Test(timeout=300_000) - fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() { + @Test(timeout = 300_000) + fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -79,7 +86,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -105,8 +115,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * succeeds and the flow finishes. */ - @Test(timeout=300_000) - fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -158,7 +168,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -185,8 +198,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries. * The flow should complete successfully as the error is swallowed. */ - @Test(timeout=300_000) - fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() { + @Test(timeout = 300_000) + fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -238,7 +251,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -270,8 +286,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to * verify that 3 retries are attempted before recovering. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -323,7 +339,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -356,8 +375,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * * CORDA-3352 - it is currently hanging after putting the flow in for observation */ - @Test(timeout=300_000) -@Ignore + @Test(timeout = 300_000) + @Ignore fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() { startDriver { val charlie = createNode(CHARLIE_NAME) @@ -411,7 +430,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -443,8 +465,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to * verify that 3 retries are attempted before recovering. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -513,7 +535,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -540,8 +565,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * * Each time the flow retries, it begins from the previous checkpoint where it suspended before failing. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -602,7 +627,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -629,8 +657,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight * observation. It is not ran again after this point */ - @Test(timeout=300_000) - fun `error during retry of a flow will force the flow into overnight observation`() { + @Test(timeout = 300_000) + fun `error during retry of a flow will force the flow into overnight observation`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -699,7 +727,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -729,8 +760,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * flow will still finish successfully. This is due to the even being scheduled as part of the retry and the failure in the database * commit occurs after this point. As the flow is already scheduled, the failure has not affect on it. */ - @Test(timeout=300_000) - fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() { + @Test(timeout = 300_000) + fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -798,7 +829,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -828,8 +862,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * CORDA-3352 - it is currently hanging after putting the flow in for observation * */ - @Test(timeout=300_000) -@Ignore + @Test(timeout = 300_000) + @Ignore fun `error during retrying a flow that failed when committing its original checkpoint will force the flow into overnight observation`() { startDriver { val charlie = createNode(CHARLIE_NAME) @@ -883,7 +917,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -910,8 +947,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * * Each time the flow retries, it begins from the previous checkpoint where it suspended before failing. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -975,7 +1012,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -994,6 +1034,196 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { } } + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state). + * + * The exception is thrown 5 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + */ + @Test(timeout = 300_000) + fun `flow can be retried when there is a transient connection error to the database`() { + startDriver { + val charlie = createNode(CHARLIE_NAME) + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 5 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + val output = getBytemanOutput(alice) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state). + * + * The exception is thrown 7 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * fails and is kept for in for observation. + */ + @Test(timeout = 300_000) + fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() { + startDriver { + val charlie = createNode(CHARLIE_NAME) + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 7 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + executor.execute { + aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()) + } + + // flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead + Thread.sleep(30.seconds.toMillis()) + + val output = getBytemanOutput(alice) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + /** * Throws an exception when performing an [Action.CommitTransaction] event on a responding flow. The failure prevents the node from saving * its original checkpoint. @@ -1009,8 +1239,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify * that 3 retries are attempted before recovering. */ - @Test(timeout=300_000) - fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1064,7 +1294,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val charlieClient = CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -1104,8 +1337,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * able to recover when the node is restarted (by using the events). The initiating flow maintains the checkpoint as it is waiting for * the responding flow to recover and finish its flow. */ - @Test(timeout=300_000) - fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() { + @Test(timeout = 300_000) + fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1160,7 +1393,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -1192,8 +1428,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * succeeds and the flow finishes. */ - @Test(timeout=300_000) - fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1258,7 +1494,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val charlieClient = CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -1278,4 +1517,202 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) } } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state) on a responding node. + * + * The exception is thrown 5 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + */ + @Test(timeout = 300_000) + fun `responding flow - session init can be retried when there is a transient connection error to the database`() { + startDriver { + val charlie = createBytemanNode(CHARLIE_NAME) + val alice = createNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 5 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + val charlieClient = + CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + val output = getBytemanOutput(charlie) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + assertEquals(0, charlieClient.stateMachinesSnapshot().size) + assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state) on a responding node. + * + * The exception is thrown 7 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * fails and is kept for in for observation. + */ + @Test(timeout = 300_000) + fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() { + startDriver { + val charlie = createBytemanNode(CHARLIE_NAME) + val alice = createNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 7 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + val charlieClient = + CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + assertFailsWith { + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + } + + val output = getBytemanOutput(charlie) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + assertEquals(1, charlieClient.stateMachinesSnapshot().size) + assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index b1eec763f6..78a0e627bd 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -66,6 +66,7 @@ class DBCheckpointStorage : CheckpointStorage { return session.createQuery(delete).executeUpdate() > 0 } + @Throws(SQLException::class) override fun getCheckpoint(id: StateMachineRunId): SerializedBytes? { val bytes = currentDBSession().get(DBCheckpoint::class.java, id.uuid.toString())?.checkpoint ?: return null return SerializedBytes(bytes) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 595b644493..d6e58f84b9 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -607,9 +607,8 @@ class SingleThreadedStateMachineManager( val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion) - val flowAlreadyExists = mutex.locked { flows[flowId] != null } - - val existingCheckpoint = if (flowAlreadyExists) { + val existingFlow = mutex.locked { flows[flowId] } + val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) { // Load the flow's checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) @@ -617,8 +616,10 @@ class SingleThreadedStateMachineManager( val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId) if (checkpoint == null) { return openFuture>().mapError { - IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " + - "Something is very wrong. The flow will not retry.") + IllegalStateException( + "Unable to deserialize database checkpoint for flow $flowId. " + + "Something is very wrong. The flow will not retry." + ) } } else { checkpoint @@ -628,6 +629,7 @@ class SingleThreadedStateMachineManager( // This is a brand new flow null } + val checkpoint = existingCheckpoint ?: Checkpoint.create( invocationContext, flowStart, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 6408748b42..065b352ff6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -146,6 +146,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, val payload = RejectSessionMessage(message, secureRandom.nextLong()) val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload) + log.info("Sending session initiation error back to $sender", error) + flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) event.deduplicationHandler.afterDatabaseTransaction() } From 24b0240d822d341dc70b31e6d29c857d9ce7b56d Mon Sep 17 00:00:00 2001 From: James Higgs <45565019+JamesHR3@users.noreply.github.com> Date: Wed, 17 Jun 2020 14:32:12 +0100 Subject: [PATCH 3/5] EG-2654 - Ensure stack traces are printed to the logs in error reporting (#6345) * EG-2654 Ensure stack trace is printed to the logs in error reporting * EG-2654 - Add a test case for exception logging --- .../errorReporting/ErrorReporterImpl.kt | 7 ++++++- .../database-failed-startup.properties | 2 +- .../database-failed-startup_en_US.properties | 2 +- .../errorReporting/DatabaseErrorsTest.kt | 2 +- .../errorReporting/ErrorReporterImplTest.kt | 18 +++++++++++++++++- .../errorReporting/test-case4.properties | 4 ++++ .../errorReporting/test-case4_en_US.properties | 4 ++++ .../net/corda/node/internal/AbstractNode.kt | 7 +++++-- 8 files changed, 39 insertions(+), 7 deletions(-) create mode 100644 common/logging/src/test/resources/errorReporting/test-case4.properties create mode 100644 common/logging/src/test/resources/errorReporting/test-case4_en_US.properties diff --git a/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt b/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt index 0e508959e8..553303870e 100644 --- a/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt +++ b/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt @@ -1,6 +1,7 @@ package net.corda.common.logging.errorReporting import org.slf4j.Logger +import java.lang.Exception import java.text.MessageFormat import java.util.* @@ -31,6 +32,10 @@ internal class ErrorReporterImpl(private val resourceLocation: String, override fun report(error: ErrorCode<*>, logger: Logger) { val errorResource = ErrorResource.fromErrorCode(error, resourceLocation, locale) val message = "${errorResource.getErrorMessage(error.parameters.toTypedArray())} ${getErrorInfo(error)}" - logger.error(message) + if (error is Exception) { + logger.error(message, error) + } else { + logger.error(message) + } } } \ No newline at end of file diff --git a/common/logging/src/main/resources/error-codes/database-failed-startup.properties b/common/logging/src/main/resources/error-codes/database-failed-startup.properties index 996de3ba76..2c21cd3e9a 100644 --- a/common/logging/src/main/resources/error-codes/database-failed-startup.properties +++ b/common/logging/src/main/resources/error-codes/database-failed-startup.properties @@ -1,4 +1,4 @@ -errorTemplate = Failed to create the datasource. See the logs for further information and the cause. +errorTemplate = Failed to create the datasource: {0}. See the logs for further information and the cause. shortDescription = The datasource could not be created for unknown reasons. actionsToFix = The logs in the logs directory should contain more information on what went wrong. aliases = \ No newline at end of file diff --git a/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties b/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties index 1abe8840bb..194292abf5 100644 --- a/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties +++ b/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties @@ -1,3 +1,3 @@ -errorTemplate = Failed to create the datasource. See the logs for further information and the cause. +errorTemplate = Failed to create the datasource: {0}. See the logs for further information and the cause. shortDescription = The datasource could not be created for unknown reasons. actionsToFix = The logs in the logs directory should contain more information on what went wrong. \ No newline at end of file diff --git a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt index d8697e9415..753325a0cd 100644 --- a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt +++ b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt @@ -6,7 +6,7 @@ import java.net.InetAddress class DatabaseErrorsTest : ErrorCodeTest(NodeDatabaseErrors::class.java) { override val dataForCodes = mapOf( NodeDatabaseErrors.COULD_NOT_CONNECT to listOf(), - NodeDatabaseErrors.FAILED_STARTUP to listOf(), + NodeDatabaseErrors.FAILED_STARTUP to listOf("This is a test message"), NodeDatabaseErrors.MISSING_DRIVER to listOf(), NodeDatabaseErrors.PASSWORD_REQUIRED_FOR_H2 to listOf(InetAddress.getLocalHost()) ) diff --git a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt index 40efb4e164..95f9d38141 100644 --- a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt +++ b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt @@ -7,6 +7,7 @@ import net.corda.common.logging.errorReporting.ErrorContextProvider import net.corda.common.logging.errorReporting.ErrorReporterImpl import org.junit.After import org.junit.Test +import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.anyString import org.mockito.Mockito import org.slf4j.Logger @@ -24,6 +25,7 @@ class ErrorReporterImplTest { private val loggerMock = Mockito.mock(Logger::class.java).also { Mockito.`when`(it.error(anyString())).then { logs.addAll(it.arguments) } + Mockito.`when`(it.error(anyString(), any(Exception::class.java))).then { params -> logs.addAll(params.arguments) } } private val contextProvider: ErrorContextProvider = object : ErrorContextProvider { @@ -39,7 +41,8 @@ class ErrorReporterImplTest { private enum class TestErrors : ErrorCodes { CASE1, CASE2, - CASE_3; + CASE_3, + CASE4; override val namespace = TestNamespaces.TEST.toString() } @@ -59,6 +62,11 @@ class ErrorReporterImplTest { override val parameters = listOf() } + private class TestError4(cause: Exception?) : Exception("This is test error 4", cause), ErrorCode { + override val code = TestErrors.CASE4 + override val parameters = listOf() + } + private fun createReporterImpl(localeTag: String?) : ErrorReporterImpl { val locale = if (localeTag != null) Locale.forLanguageTag(localeTag) else Locale.getDefault() return ErrorReporterImpl("errorReporting", locale, contextProvider) @@ -118,4 +126,12 @@ class ErrorReporterImplTest { testReporter.report(error, loggerMock) assertEquals(listOf("This is the third test message [Code: test-case-3 URL: $TEST_URL/en-US]"), logs) } + + @Test(timeout = 3_000) + fun `exception based error code logs the stack trace`() { + val error = TestError4(Exception("A test exception")) + val testReporter = createReporterImpl("en-US") + testReporter.report(error, loggerMock) + assertEquals(listOf("This is the fourth test message [Code: test-case4 URL: $TEST_URL/en-US]", error), logs) + } } \ No newline at end of file diff --git a/common/logging/src/test/resources/errorReporting/test-case4.properties b/common/logging/src/test/resources/errorReporting/test-case4.properties new file mode 100644 index 0000000000..e4911daacf --- /dev/null +++ b/common/logging/src/test/resources/errorReporting/test-case4.properties @@ -0,0 +1,4 @@ +errorTemplate = This is the fourth test message +shortDescription = Test description +actionsToFix = Actions +aliases = \ No newline at end of file diff --git a/common/logging/src/test/resources/errorReporting/test-case4_en_US.properties b/common/logging/src/test/resources/errorReporting/test-case4_en_US.properties new file mode 100644 index 0000000000..e4911daacf --- /dev/null +++ b/common/logging/src/test/resources/errorReporting/test-case4_en_US.properties @@ -0,0 +1,4 @@ +errorTemplate = This is the fourth test message +shortDescription = Test description +actionsToFix = Actions +aliases = \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 9c015067e3..e53e6bd418 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -1360,11 +1360,14 @@ fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfi "Could not find the database driver class. Please add it to the 'drivers' folder.", NodeDatabaseErrors.MISSING_DRIVER) ex is OutstandingDatabaseChangesException -> throw (DatabaseIncompatibleException(ex.message)) - else -> + else -> { + val msg = ex.message ?: ex::class.java.canonicalName throw CouldNotCreateDataSourceException( "Could not create the DataSource: ${ex.message}", NodeDatabaseErrors.FAILED_STARTUP, - cause = ex) + cause = ex, + parameters = listOf(msg)) + } } } } From d0c0a1d9ba5754ae9bb8777a8a821d267696b876 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Wed, 17 Jun 2020 17:28:26 +0100 Subject: [PATCH 4/5] ENT-5430: Fix deserialisation of commands containing generic types. (#6359) --- .../serialization/generics/DataObject.kt | 14 +++++ .../generics/GenericTypeContract.kt | 37 ++++++++++++ .../serialization/generics/GenericTypeFlow.kt | 27 +++++++++ .../corda/node/ContractWithGenericTypeTest.kt | 52 ++++++++++++++++ ...eterministicContractWithGenericTypeTest.kt | 59 +++++++++++++++++++ .../djvm/SandboxSerializerFactoryFactory.kt | 2 +- .../internal/amqp/LocalSerializerFactory.kt | 3 +- .../internal/amqp/SerializerFactoryBuilder.kt | 2 +- .../internal/model/TypeIdentifier.kt | 18 +++--- .../model/TypeModellingFingerPrinter.kt | 12 ++-- .../amqp/TypeModellingFingerPrinterTests.kt | 2 +- 11 files changed, 207 insertions(+), 21 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/DataObject.kt create mode 100644 node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/GenericTypeContract.kt create mode 100644 node/src/integration-test/kotlin/net/corda/flows/serialization/generics/GenericTypeFlow.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/ContractWithGenericTypeTest.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/DeterministicContractWithGenericTypeTest.kt diff --git a/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/DataObject.kt b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/DataObject.kt new file mode 100644 index 0000000000..6384bd3900 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/DataObject.kt @@ -0,0 +1,14 @@ +package net.corda.contracts.serialization.generics + +import net.corda.core.serialization.CordaSerializable + +@CordaSerializable +data class DataObject(val value: Long) : Comparable { + override fun toString(): String { + return "$value data points" + } + + override fun compareTo(other: DataObject): Int { + return value.compareTo(other.value) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/GenericTypeContract.kt b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/GenericTypeContract.kt new file mode 100644 index 0000000000..4fcdae9da3 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/GenericTypeContract.kt @@ -0,0 +1,37 @@ +package net.corda.contracts.serialization.generics + +import net.corda.core.contracts.CommandData +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.identity.AbstractParty +import net.corda.core.transactions.LedgerTransaction +import java.util.Optional + +@Suppress("unused") +class GenericTypeContract : Contract { + override fun verify(tx: LedgerTransaction) { + val state = tx.outputsOfType() + require(state.isNotEmpty()) { + "Requires at least one data state" + } + } + + @Suppress("CanBeParameter", "MemberVisibilityCanBePrivate") + class State(val owner: AbstractParty, val data: DataObject) : ContractState { + override val participants: List = listOf(owner) + + @Override + override fun toString(): String { + return data.toString() + } + } + + /** + * The [price] field is the important feature of the [Purchase] + * class because its type is [Optional] with a CorDapp-specific + * generic type parameter. It does not matter that the [price] + * is not used; it only matters that the [Purchase] command + * must be serialized as part of building a new transaction. + */ + class Purchase(val price: Optional) : CommandData +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/flows/serialization/generics/GenericTypeFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/serialization/generics/GenericTypeFlow.kt new file mode 100644 index 0000000000..2325d767b0 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/serialization/generics/GenericTypeFlow.kt @@ -0,0 +1,27 @@ +package net.corda.flows.serialization.generics + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.serialization.generics.DataObject +import net.corda.contracts.serialization.generics.GenericTypeContract.Purchase +import net.corda.contracts.serialization.generics.GenericTypeContract.State +import net.corda.core.contracts.Command +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.transactions.TransactionBuilder +import java.util.Optional + +@StartableByRPC +class GenericTypeFlow(private val purchase: DataObject) : FlowLogic() { + @Suspendable + override fun call(): SecureHash { + val notary = serviceHub.networkMapCache.notaryIdentities[0] + val stx = serviceHub.signInitialTransaction( + TransactionBuilder(notary) + .addOutputState(State(ourIdentity, purchase)) + .addCommand(Command(Purchase(Optional.of(purchase)), ourIdentity.owningKey)) + ) + stx.verify(serviceHub, checkSufficientSignatures = false) + return stx.id + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/ContractWithGenericTypeTest.kt b/node/src/integration-test/kotlin/net/corda/node/ContractWithGenericTypeTest.kt new file mode 100644 index 0000000000..4a093de5ba --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/ContractWithGenericTypeTest.kt @@ -0,0 +1,52 @@ +package net.corda.node + +import net.corda.client.rpc.CordaRPCClient +import net.corda.contracts.serialization.generics.DataObject +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.loggerFor +import net.corda.flows.serialization.generics.GenericTypeFlow +import net.corda.node.services.Permissions +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.User +import net.corda.testing.node.internal.cordappWithPackages +import org.junit.Test + +@Suppress("FunctionName") +class ContractWithGenericTypeTest { + companion object { + const val DATA_VALUE = 5000L + + @JvmField + val logger = loggerFor() + } + + @Test(timeout=300_000) + fun `flow with generic type`() { + val user = User("u", "p", setOf(Permissions.all())) + driver(DriverParameters( + portAllocation = incrementalPortAllocation(), + startNodesInProcess = false, + notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, validating = true)), + cordappsForAllNodes = listOf( + cordappWithPackages("net.corda.flows.serialization.generics").signed(), + cordappWithPackages("net.corda.contracts.serialization.generics").signed() + ) + )) { + val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val txID = CordaRPCClient(hostAndPort = alice.rpcAddress) + .start(user.username, user.password) + .use { client -> + client.proxy.startFlow(::GenericTypeFlow, DataObject(DATA_VALUE)) + .returnValue + .getOrThrow() + } + logger.info("TX-ID=$txID") + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/DeterministicContractWithGenericTypeTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/DeterministicContractWithGenericTypeTest.kt new file mode 100644 index 0000000000..b788091232 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/DeterministicContractWithGenericTypeTest.kt @@ -0,0 +1,59 @@ +package net.corda.node.services + +import net.corda.client.rpc.CordaRPCClient +import net.corda.contracts.serialization.generics.DataObject +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.loggerFor +import net.corda.flows.serialization.generics.GenericTypeFlow +import net.corda.node.DeterministicSourcesRule +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.User +import net.corda.testing.node.internal.cordappWithPackages +import org.junit.ClassRule +import org.junit.Test + +@Suppress("FunctionName") +class DeterministicContractWithGenericTypeTest { + companion object { + const val DATA_VALUE = 5000L + + @JvmField + val logger = loggerFor() + + @ClassRule + @JvmField + val djvmSources = DeterministicSourcesRule() + } + + @Test(timeout=300_000) + fun `test DJVM can deserialise command with generic type`() { + val user = User("u", "p", setOf(Permissions.all())) + driver(DriverParameters( + portAllocation = incrementalPortAllocation(), + startNodesInProcess = false, + notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, validating = true)), + cordappsForAllNodes = listOf( + cordappWithPackages("net.corda.flows.serialization.generics").signed(), + cordappWithPackages("net.corda.contracts.serialization.generics").signed() + ), + djvmBootstrapSource = djvmSources.bootstrap, + djvmCordaSource = djvmSources.corda + )) { + val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val txID = CordaRPCClient(hostAndPort = alice.rpcAddress) + .start(user.username, user.password) + .use { client -> + client.proxy.startFlow(::GenericTypeFlow, DataObject(DATA_VALUE)) + .returnValue + .getOrThrow() + } + logger.info("TX-ID=$txID") + } + } +} \ No newline at end of file diff --git a/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt b/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt index 95abdcd3fe..bafa2b8dea 100644 --- a/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt +++ b/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt @@ -75,7 +75,7 @@ class SandboxSerializerFactoryFactory( ) ) - val fingerPrinter = TypeModellingFingerPrinter(customSerializerRegistry) + val fingerPrinter = TypeModellingFingerPrinter(customSerializerRegistry, classLoader) val localSerializerFactory = DefaultLocalSerializerFactory( whitelist = context.whitelist, diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt index 9cf064a70b..9b0ce7b9ae 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt @@ -7,7 +7,6 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.trace import net.corda.serialization.internal.model.* import net.corda.serialization.internal.model.TypeIdentifier.* -import net.corda.serialization.internal.model.TypeIdentifier.Companion.classLoaderFor import org.apache.qpid.proton.amqp.Symbol import java.lang.reflect.ParameterizedType import java.lang.reflect.Type @@ -161,7 +160,7 @@ class DefaultLocalSerializerFactory( val declaredGenericType = if (declaredType !is ParameterizedType && localTypeInformation.typeIdentifier is Parameterised && declaredClass != Class::class.java) { - localTypeInformation.typeIdentifier.getLocalType(classLoaderFor(declaredClass)) + localTypeInformation.typeIdentifier.getLocalType(classloader) } else { declaredType } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt index 4e7fbb466b..dac25a17ad 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt @@ -103,7 +103,7 @@ object SerializerFactoryBuilder { customSerializerRegistry)) val fingerPrinter = overrideFingerPrinter ?: - TypeModellingFingerPrinter(customSerializerRegistry) + TypeModellingFingerPrinter(customSerializerRegistry, classCarpenter.classloader) val localSerializerFactory = DefaultLocalSerializerFactory( whitelist, diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt index 2697b107a8..3477c02a48 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt @@ -45,12 +45,12 @@ sealed class TypeIdentifier { * Obtain a nicely-formatted representation of the identified type, for help with debugging. */ fun prettyPrint(simplifyClassNames: Boolean = true): String = when(this) { - is TypeIdentifier.UnknownType -> "?" - is TypeIdentifier.TopType -> "*" - is TypeIdentifier.Unparameterised -> name.simplifyClassNameIfRequired(simplifyClassNames) - is TypeIdentifier.Erased -> "${name.simplifyClassNameIfRequired(simplifyClassNames)} (erased)" - is TypeIdentifier.ArrayOf -> "${componentType.prettyPrint(simplifyClassNames)}[]" - is TypeIdentifier.Parameterised -> + is UnknownType -> "?" + is TopType -> "*" + is Unparameterised -> name.simplifyClassNameIfRequired(simplifyClassNames) + is Erased -> "${name.simplifyClassNameIfRequired(simplifyClassNames)} (erased)" + is ArrayOf -> "${componentType.prettyPrint(simplifyClassNames)}[]" + is Parameterised -> name.simplifyClassNameIfRequired(simplifyClassNames) + parameters.joinToString(", ", "<", ">") { it.prettyPrint(simplifyClassNames) } @@ -63,8 +63,6 @@ sealed class TypeIdentifier { // This method has locking. So we memo the value here. private val systemClassLoader: ClassLoader = ClassLoader.getSystemClassLoader() - fun classLoaderFor(clazz: Class<*>): ClassLoader = clazz.classLoader ?: systemClassLoader - /** * Obtain the [TypeIdentifier] for an erased Java class. * @@ -81,7 +79,7 @@ sealed class TypeIdentifier { * Obtain the [TypeIdentifier] for a Java [Type] (typically obtained by calling one of * [java.lang.reflect.Parameter.getAnnotatedType], * [java.lang.reflect.Field.getGenericType] or - * [java.lang.reflect.Method.getGenericReturnType]). Wildcard types and type variables are converted to [Unknown]. + * [java.lang.reflect.Method.getGenericReturnType]). Wildcard types and type variables are converted to [UnknownType]. * * @param type The [Type] to obtain a [TypeIdentifier] for. * @param resolutionContext Optionally, a [Type] which can be used to resolve type variables, for example a @@ -273,5 +271,5 @@ private class ReconstitutedParameterizedType( other.ownerType == ownerType && Arrays.equals(other.actualTypeArguments, actualTypeArguments) override fun hashCode(): Int = - Arrays.hashCode(actualTypeArguments) xor Objects.hashCode(ownerType) xor Objects.hashCode(rawType) + actualTypeArguments.contentHashCode() xor Objects.hashCode(ownerType) xor Objects.hashCode(rawType) } \ No newline at end of file diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt index c5d79ed41f..8965a5c8e1 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt @@ -5,7 +5,6 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.toBase64 import net.corda.serialization.internal.amqp.* import net.corda.serialization.internal.model.TypeIdentifier.* -import net.corda.serialization.internal.model.TypeIdentifier.Companion.classLoaderFor import java.lang.reflect.ParameterizedType /** @@ -31,6 +30,7 @@ interface FingerPrinter { */ class TypeModellingFingerPrinter( private val customTypeDescriptorLookup: CustomSerializerRegistry, + private val classLoader: ClassLoader, private val debugEnabled: Boolean = false) : FingerPrinter { private val cache: MutableMap = DefaultCacheProvider.createCache() @@ -42,7 +42,7 @@ class TypeModellingFingerPrinter( * the Fingerprinter cannot guarantee that. */ cache.getOrPut(typeInformation.typeIdentifier) { - FingerPrintingState(customTypeDescriptorLookup, FingerprintWriter(debugEnabled)) + FingerPrintingState(customTypeDescriptorLookup, classLoader, FingerprintWriter(debugEnabled)) .fingerprint(typeInformation) } } @@ -95,6 +95,7 @@ internal class FingerprintWriter(debugEnabled: Boolean = false) { */ private class FingerPrintingState( private val customSerializerRegistry: CustomSerializerRegistry, + private val classLoader: ClassLoader, private val writer: FingerprintWriter) { companion object { @@ -200,7 +201,7 @@ private class FingerPrintingState( private fun fingerprintName(type: LocalTypeInformation) { val identifier = type.typeIdentifier when (identifier) { - is TypeIdentifier.ArrayOf -> writer.write(identifier.componentType.name).writeArray() + is ArrayOf -> writer.write(identifier.componentType.name).writeArray() else -> writer.write(identifier.name) } } @@ -239,7 +240,7 @@ private class FingerPrintingState( val observedGenericType = if (observedType !is ParameterizedType && type.typeIdentifier is Parameterised && observedClass != Class::class.java) { - type.typeIdentifier.getLocalType(classLoaderFor(observedClass)) + type.typeIdentifier.getLocalType(classLoader) } else { observedType } @@ -259,6 +260,5 @@ private class FingerPrintingState( // and deserializing (assuming deserialization is occurring in a factory that didn't // serialise the object in the first place (and thus the cache lookup fails). This is also // true of Any, where we need Example and Example to have the same fingerprint - private fun hasSeen(type: TypeIdentifier) = (type in typesSeen) - && (type != TypeIdentifier.UnknownType) + private fun hasSeen(type: TypeIdentifier) = (type in typesSeen) && (type != UnknownType) } diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt index 362972afc7..84c3a27e63 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt @@ -12,7 +12,7 @@ class TypeModellingFingerPrinterTests { val descriptorBasedSerializerRegistry = DefaultDescriptorBasedSerializerRegistry() val customRegistry = CachingCustomSerializerRegistry(descriptorBasedSerializerRegistry) - val fingerprinter = TypeModellingFingerPrinter(customRegistry, true) + val fingerprinter = TypeModellingFingerPrinter(customRegistry, ClassLoader.getSystemClassLoader(), true) // See https://r3-cev.atlassian.net/browse/CORDA-2266 @Test(timeout=300_000) From 56d0bbc03615d124d15aeab02a135c15c15af38e Mon Sep 17 00:00:00 2001 From: LankyDan Date: Thu, 18 Jun 2020 16:15:15 +0100 Subject: [PATCH 5/5] CORDA-3841 Check `isAnyCheckpointPersisted` in `startFlowInternal` (#6351) Only hit the database if `StateMachineState.isAnyCheckpointPersisted` returns true. Otherwise, there will be no checkpoint to retrieve from the database anyway. This can prevent errors due to a transient loss of connection to the database. Update tests after merging to 4.6 --- .../statemachine/StatemachineGeneralErrorHandlingTest.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt index 79bc69aec0..e0732cd316 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt @@ -1125,7 +1125,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { assertEquals(3, discharge) assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) - assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) } } @@ -1220,7 +1220,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { assertEquals(3, discharge) assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) - assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) } } @@ -1612,7 +1612,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { assertEquals(0, observation) assertEquals(0, aliceClient.stateMachinesSnapshot().size) assertEquals(0, charlieClient.stateMachinesSnapshot().size) - assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) } } @@ -1712,7 +1712,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { assertEquals(1, observation) assertEquals(1, aliceClient.stateMachinesSnapshot().size) assertEquals(1, charlieClient.stateMachinesSnapshot().size) - assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get()) + assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) } } } \ No newline at end of file