diff --git a/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt b/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt index 0e508959e8..553303870e 100644 --- a/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt +++ b/common/logging/src/main/kotlin/net/corda/common/logging/errorReporting/ErrorReporterImpl.kt @@ -1,6 +1,7 @@ package net.corda.common.logging.errorReporting import org.slf4j.Logger +import java.lang.Exception import java.text.MessageFormat import java.util.* @@ -31,6 +32,10 @@ internal class ErrorReporterImpl(private val resourceLocation: String, override fun report(error: ErrorCode<*>, logger: Logger) { val errorResource = ErrorResource.fromErrorCode(error, resourceLocation, locale) val message = "${errorResource.getErrorMessage(error.parameters.toTypedArray())} ${getErrorInfo(error)}" - logger.error(message) + if (error is Exception) { + logger.error(message, error) + } else { + logger.error(message) + } } } \ No newline at end of file diff --git a/common/logging/src/main/resources/error-codes/database-failed-startup.properties b/common/logging/src/main/resources/error-codes/database-failed-startup.properties index 996de3ba76..2c21cd3e9a 100644 --- a/common/logging/src/main/resources/error-codes/database-failed-startup.properties +++ b/common/logging/src/main/resources/error-codes/database-failed-startup.properties @@ -1,4 +1,4 @@ -errorTemplate = Failed to create the datasource. See the logs for further information and the cause. +errorTemplate = Failed to create the datasource: {0}. See the logs for further information and the cause. shortDescription = The datasource could not be created for unknown reasons. actionsToFix = The logs in the logs directory should contain more information on what went wrong. aliases = \ No newline at end of file diff --git a/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties b/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties index 1abe8840bb..194292abf5 100644 --- a/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties +++ b/common/logging/src/main/resources/error-codes/database-failed-startup_en_US.properties @@ -1,3 +1,3 @@ -errorTemplate = Failed to create the datasource. See the logs for further information and the cause. +errorTemplate = Failed to create the datasource: {0}. See the logs for further information and the cause. shortDescription = The datasource could not be created for unknown reasons. actionsToFix = The logs in the logs directory should contain more information on what went wrong. \ No newline at end of file diff --git a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt index d8697e9415..753325a0cd 100644 --- a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt +++ b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/DatabaseErrorsTest.kt @@ -6,7 +6,7 @@ import java.net.InetAddress class DatabaseErrorsTest : ErrorCodeTest(NodeDatabaseErrors::class.java) { override val dataForCodes = mapOf( NodeDatabaseErrors.COULD_NOT_CONNECT to listOf(), - NodeDatabaseErrors.FAILED_STARTUP to listOf(), + NodeDatabaseErrors.FAILED_STARTUP to listOf("This is a test message"), NodeDatabaseErrors.MISSING_DRIVER to listOf(), NodeDatabaseErrors.PASSWORD_REQUIRED_FOR_H2 to listOf(InetAddress.getLocalHost()) ) diff --git a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt index 40efb4e164..95f9d38141 100644 --- a/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt +++ b/common/logging/src/test/kotlin/net/corda/commmon/logging/errorReporting/ErrorReporterImplTest.kt @@ -7,6 +7,7 @@ import net.corda.common.logging.errorReporting.ErrorContextProvider import net.corda.common.logging.errorReporting.ErrorReporterImpl import org.junit.After import org.junit.Test +import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.anyString import org.mockito.Mockito import org.slf4j.Logger @@ -24,6 +25,7 @@ class ErrorReporterImplTest { private val loggerMock = Mockito.mock(Logger::class.java).also { Mockito.`when`(it.error(anyString())).then { logs.addAll(it.arguments) } + Mockito.`when`(it.error(anyString(), any(Exception::class.java))).then { params -> logs.addAll(params.arguments) } } private val contextProvider: ErrorContextProvider = object : ErrorContextProvider { @@ -39,7 +41,8 @@ class ErrorReporterImplTest { private enum class TestErrors : ErrorCodes { CASE1, CASE2, - CASE_3; + CASE_3, + CASE4; override val namespace = TestNamespaces.TEST.toString() } @@ -59,6 +62,11 @@ class ErrorReporterImplTest { override val parameters = listOf() } + private class TestError4(cause: Exception?) : Exception("This is test error 4", cause), ErrorCode { + override val code = TestErrors.CASE4 + override val parameters = listOf() + } + private fun createReporterImpl(localeTag: String?) : ErrorReporterImpl { val locale = if (localeTag != null) Locale.forLanguageTag(localeTag) else Locale.getDefault() return ErrorReporterImpl("errorReporting", locale, contextProvider) @@ -118,4 +126,12 @@ class ErrorReporterImplTest { testReporter.report(error, loggerMock) assertEquals(listOf("This is the third test message [Code: test-case-3 URL: $TEST_URL/en-US]"), logs) } + + @Test(timeout = 3_000) + fun `exception based error code logs the stack trace`() { + val error = TestError4(Exception("A test exception")) + val testReporter = createReporterImpl("en-US") + testReporter.report(error, loggerMock) + assertEquals(listOf("This is the fourth test message [Code: test-case4 URL: $TEST_URL/en-US]", error), logs) + } } \ No newline at end of file diff --git a/common/logging/src/test/resources/errorReporting/test-case4.properties b/common/logging/src/test/resources/errorReporting/test-case4.properties new file mode 100644 index 0000000000..e4911daacf --- /dev/null +++ b/common/logging/src/test/resources/errorReporting/test-case4.properties @@ -0,0 +1,4 @@ +errorTemplate = This is the fourth test message +shortDescription = Test description +actionsToFix = Actions +aliases = \ No newline at end of file diff --git a/common/logging/src/test/resources/errorReporting/test-case4_en_US.properties b/common/logging/src/test/resources/errorReporting/test-case4_en_US.properties new file mode 100644 index 0000000000..e4911daacf --- /dev/null +++ b/common/logging/src/test/resources/errorReporting/test-case4_en_US.properties @@ -0,0 +1,4 @@ +errorTemplate = This is the fourth test message +shortDescription = Test description +actionsToFix = Actions +aliases = \ No newline at end of file diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt index 349ab5e5a8..e0732cd316 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StatemachineGeneralErrorHandlingTest.kt @@ -4,6 +4,7 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds +import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.transitions.TopLevelTransition import net.corda.testing.core.ALICE_NAME @@ -11,6 +12,8 @@ import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.singleIdentity import org.junit.Ignore import org.junit.Test +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.TimeoutException import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -18,6 +21,10 @@ import kotlin.test.assertFailsWith @Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { + private companion object { + val executor: ExecutorService = Executors.newSingleThreadExecutor() + } + /** * Throws an exception when performing an [Action.SendInitial] action. * The exception is thrown 4 times. @@ -25,8 +32,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * This causes the transition to be discharged from the hospital 3 times (retries 3 times) and is then kept in * the hospital for observation. */ - @Test(timeout=300_000) - fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() { + @Test(timeout = 300_000) + fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -79,7 +86,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -105,8 +115,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * succeeds and the flow finishes. */ - @Test(timeout=300_000) - fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -158,7 +168,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -185,8 +198,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries. * The flow should complete successfully as the error is swallowed. */ - @Test(timeout=300_000) - fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() { + @Test(timeout = 300_000) + fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -238,7 +251,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -270,8 +286,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to * verify that 3 retries are attempted before recovering. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -323,7 +339,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -356,8 +375,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * * CORDA-3352 - it is currently hanging after putting the flow in for observation */ - @Test(timeout=300_000) -@Ignore + @Test(timeout = 300_000) + @Ignore fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() { startDriver { val charlie = createNode(CHARLIE_NAME) @@ -411,7 +430,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -443,8 +465,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to * verify that 3 retries are attempted before recovering. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -513,7 +535,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -540,8 +565,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * * Each time the flow retries, it begins from the previous checkpoint where it suspended before failing. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -602,7 +627,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -629,8 +657,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight * observation. It is not ran again after this point */ - @Test(timeout=300_000) - fun `error during retry of a flow will force the flow into overnight observation`() { + @Test(timeout = 300_000) + fun `error during retry of a flow will force the flow into overnight observation`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -699,7 +727,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -729,8 +760,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * flow will still finish successfully. This is due to the even being scheduled as part of the retry and the failure in the database * commit occurs after this point. As the flow is already scheduled, the failure has not affect on it. */ - @Test(timeout=300_000) - fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() { + @Test(timeout = 300_000) + fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -798,7 +829,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -828,8 +862,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * CORDA-3352 - it is currently hanging after putting the flow in for observation * */ - @Test(timeout=300_000) -@Ignore + @Test(timeout = 300_000) + @Ignore fun `error during retrying a flow that failed when committing its original checkpoint will force the flow into overnight observation`() { startDriver { val charlie = createNode(CHARLIE_NAME) @@ -883,7 +917,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -910,8 +947,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * * Each time the flow retries, it begins from the previous checkpoint where it suspended before failing. */ - @Test(timeout=300_000) - fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() { + @Test(timeout = 300_000) + fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() { startDriver { val charlie = createNode(CHARLIE_NAME) val alice = createBytemanNode(ALICE_NAME) @@ -975,7 +1012,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -994,6 +1034,196 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { } } + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state). + * + * The exception is thrown 5 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + */ + @Test(timeout = 300_000) + fun `flow can be retried when there is a transient connection error to the database`() { + startDriver { + val charlie = createNode(CHARLIE_NAME) + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 5 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + val output = getBytemanOutput(alice) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state). + * + * The exception is thrown 7 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * fails and is kept for in for observation. + */ + @Test(timeout = 300_000) + fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() { + startDriver { + val charlie = createNode(CHARLIE_NAME) + val alice = createBytemanNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 7 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + executor.execute { + aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()) + } + + // flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead + Thread.sleep(30.seconds.toMillis()) + + val output = getBytemanOutput(alice) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) + } + } + /** * Throws an exception when performing an [Action.CommitTransaction] event on a responding flow. The failure prevents the node from saving * its original checkpoint. @@ -1009,8 +1239,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify * that 3 retries are attempted before recovering. */ - @Test(timeout=300_000) - fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1064,7 +1294,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val charlieClient = CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -1104,8 +1337,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * able to recover when the node is restarted (by using the events). The initiating flow maintains the checkpoint as it is waiting for * the responding flow to recover and finish its flow. */ - @Test(timeout=300_000) - fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() { + @Test(timeout = 300_000) + fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1160,7 +1393,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy assertFailsWith { - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) } @@ -1192,8 +1428,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition * succeeds and the flow finishes. */ - @Test(timeout=300_000) - fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { + @Test(timeout = 300_000) + fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1258,7 +1494,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { val charlieClient = CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy - aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow( + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( 30.seconds ) @@ -1278,4 +1517,202 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() { assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) } } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state) on a responding node. + * + * The exception is thrown 5 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * succeeds and the flow finishes. + */ + @Test(timeout = 300_000) + fun `responding flow - session init can be retried when there is a transient connection error to the database`() { + startDriver { + val charlie = createBytemanNode(CHARLIE_NAME) + val alice = createNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 5 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + val charlieClient = + CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + + val output = getBytemanOutput(charlie) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(0, observation) + assertEquals(0, aliceClient.stateMachinesSnapshot().size) + assertEquals(0, charlieClient.stateMachinesSnapshot().size) + assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) + } + } + + /** + * Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted + * state) on a responding node. + * + * The exception is thrown 7 times. + * + * An exception is also thrown from [CheckpointStorage.getCheckpoint]. + * + * This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when + * retrying the flow after it failed to commit it's original checkpoint. + * + * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition + * fails and is kept for in for observation. + */ + @Test(timeout = 300_000) + fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() { + startDriver { + val charlie = createBytemanNode(CHARLIE_NAME) + val alice = createNode(ALICE_NAME) + + val rules = """ + RULE Create Counter + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF createCounter("counter", $counter) + DO traceln("Counter created") + ENDRULE + + RULE Throw exception on executeCommitTransaction action + CLASS ${ActionExecutorImpl::class.java.name} + METHOD executeCommitTransaction + AT ENTRY + IF readCounter("counter") < 7 + DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die") + ENDRULE + + RULE Throw exception on getCheckpoint + INTERFACE ${CheckpointStorage::class.java.name} + METHOD getCheckpoint + AT ENTRY + IF true + DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available") + ENDRULE + + RULE Entering internal error staff member + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT ENTRY + IF true + DO traceln("Reached internal transition error staff member") + ENDRULE + + RULE Increment discharge counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ DISCHARGE + IF true + DO traceln("Byteman test - discharging") + ENDRULE + + RULE Increment observation counter + CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name} + METHOD consult + AT READ OVERNIGHT_OBSERVATION + IF true + DO traceln("Byteman test - overnight observation") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules) + + val aliceClient = + CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + val charlieClient = + CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + assertFailsWith { + aliceClient.startFlow( + StatemachineErrorHandlingTest::SendAMessageFlow, + charlie.nodeInfo.singleIdentity() + ).returnValue.getOrThrow( + 30.seconds + ) + } + + val output = getBytemanOutput(charlie) + + // Check the stdout for the lines generated by byteman + assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size) + assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size) + val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get() + assertEquals(3, discharge) + assertEquals(1, observation) + assertEquals(1, aliceClient.stateMachinesSnapshot().size) + assertEquals(1, charlieClient.stateMachinesSnapshot().size) + assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get()) + } + } } \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/DataObject.kt b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/DataObject.kt new file mode 100644 index 0000000000..6384bd3900 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/DataObject.kt @@ -0,0 +1,14 @@ +package net.corda.contracts.serialization.generics + +import net.corda.core.serialization.CordaSerializable + +@CordaSerializable +data class DataObject(val value: Long) : Comparable { + override fun toString(): String { + return "$value data points" + } + + override fun compareTo(other: DataObject): Int { + return value.compareTo(other.value) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/GenericTypeContract.kt b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/GenericTypeContract.kt new file mode 100644 index 0000000000..4fcdae9da3 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/contracts/serialization/generics/GenericTypeContract.kt @@ -0,0 +1,37 @@ +package net.corda.contracts.serialization.generics + +import net.corda.core.contracts.CommandData +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.identity.AbstractParty +import net.corda.core.transactions.LedgerTransaction +import java.util.Optional + +@Suppress("unused") +class GenericTypeContract : Contract { + override fun verify(tx: LedgerTransaction) { + val state = tx.outputsOfType() + require(state.isNotEmpty()) { + "Requires at least one data state" + } + } + + @Suppress("CanBeParameter", "MemberVisibilityCanBePrivate") + class State(val owner: AbstractParty, val data: DataObject) : ContractState { + override val participants: List = listOf(owner) + + @Override + override fun toString(): String { + return data.toString() + } + } + + /** + * The [price] field is the important feature of the [Purchase] + * class because its type is [Optional] with a CorDapp-specific + * generic type parameter. It does not matter that the [price] + * is not used; it only matters that the [Purchase] command + * must be serialized as part of building a new transaction. + */ + class Purchase(val price: Optional) : CommandData +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/flows/serialization/generics/GenericTypeFlow.kt b/node/src/integration-test/kotlin/net/corda/flows/serialization/generics/GenericTypeFlow.kt new file mode 100644 index 0000000000..2325d767b0 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/flows/serialization/generics/GenericTypeFlow.kt @@ -0,0 +1,27 @@ +package net.corda.flows.serialization.generics + +import co.paralleluniverse.fibers.Suspendable +import net.corda.contracts.serialization.generics.DataObject +import net.corda.contracts.serialization.generics.GenericTypeContract.Purchase +import net.corda.contracts.serialization.generics.GenericTypeContract.State +import net.corda.core.contracts.Command +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.transactions.TransactionBuilder +import java.util.Optional + +@StartableByRPC +class GenericTypeFlow(private val purchase: DataObject) : FlowLogic() { + @Suspendable + override fun call(): SecureHash { + val notary = serviceHub.networkMapCache.notaryIdentities[0] + val stx = serviceHub.signInitialTransaction( + TransactionBuilder(notary) + .addOutputState(State(ourIdentity, purchase)) + .addCommand(Command(Purchase(Optional.of(purchase)), ourIdentity.owningKey)) + ) + stx.verify(serviceHub, checkSufficientSignatures = false) + return stx.id + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/ContractWithGenericTypeTest.kt b/node/src/integration-test/kotlin/net/corda/node/ContractWithGenericTypeTest.kt new file mode 100644 index 0000000000..4a093de5ba --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/ContractWithGenericTypeTest.kt @@ -0,0 +1,52 @@ +package net.corda.node + +import net.corda.client.rpc.CordaRPCClient +import net.corda.contracts.serialization.generics.DataObject +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.loggerFor +import net.corda.flows.serialization.generics.GenericTypeFlow +import net.corda.node.services.Permissions +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.User +import net.corda.testing.node.internal.cordappWithPackages +import org.junit.Test + +@Suppress("FunctionName") +class ContractWithGenericTypeTest { + companion object { + const val DATA_VALUE = 5000L + + @JvmField + val logger = loggerFor() + } + + @Test(timeout=300_000) + fun `flow with generic type`() { + val user = User("u", "p", setOf(Permissions.all())) + driver(DriverParameters( + portAllocation = incrementalPortAllocation(), + startNodesInProcess = false, + notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, validating = true)), + cordappsForAllNodes = listOf( + cordappWithPackages("net.corda.flows.serialization.generics").signed(), + cordappWithPackages("net.corda.contracts.serialization.generics").signed() + ) + )) { + val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val txID = CordaRPCClient(hostAndPort = alice.rpcAddress) + .start(user.username, user.password) + .use { client -> + client.proxy.startFlow(::GenericTypeFlow, DataObject(DATA_VALUE)) + .returnValue + .getOrThrow() + } + logger.info("TX-ID=$txID") + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/DeterministicContractWithGenericTypeTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/DeterministicContractWithGenericTypeTest.kt new file mode 100644 index 0000000000..b788091232 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/DeterministicContractWithGenericTypeTest.kt @@ -0,0 +1,59 @@ +package net.corda.node.services + +import net.corda.client.rpc.CordaRPCClient +import net.corda.contracts.serialization.generics.DataObject +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.loggerFor +import net.corda.flows.serialization.generics.GenericTypeFlow +import net.corda.node.DeterministicSourcesRule +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.incrementalPortAllocation +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.User +import net.corda.testing.node.internal.cordappWithPackages +import org.junit.ClassRule +import org.junit.Test + +@Suppress("FunctionName") +class DeterministicContractWithGenericTypeTest { + companion object { + const val DATA_VALUE = 5000L + + @JvmField + val logger = loggerFor() + + @ClassRule + @JvmField + val djvmSources = DeterministicSourcesRule() + } + + @Test(timeout=300_000) + fun `test DJVM can deserialise command with generic type`() { + val user = User("u", "p", setOf(Permissions.all())) + driver(DriverParameters( + portAllocation = incrementalPortAllocation(), + startNodesInProcess = false, + notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, validating = true)), + cordappsForAllNodes = listOf( + cordappWithPackages("net.corda.flows.serialization.generics").signed(), + cordappWithPackages("net.corda.contracts.serialization.generics").signed() + ), + djvmBootstrapSource = djvmSources.bootstrap, + djvmCordaSource = djvmSources.corda + )) { + val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val txID = CordaRPCClient(hostAndPort = alice.rpcAddress) + .start(user.username, user.password) + .use { client -> + client.proxy.startFlow(::GenericTypeFlow, DataObject(DATA_VALUE)) + .returnValue + .getOrThrow() + } + logger.info("TX-ID=$txID") + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 5a51195877..74dcee8c01 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -1361,11 +1361,14 @@ fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfi "Could not find the database driver class. Please add it to the 'drivers' folder.", NodeDatabaseErrors.MISSING_DRIVER) ex is OutstandingDatabaseChangesException -> throw (DatabaseIncompatibleException(ex.message)) - else -> + else -> { + val msg = ex.message ?: ex::class.java.canonicalName throw CouldNotCreateDataSourceException( "Could not create the DataSource: ${ex.message}", NodeDatabaseErrors.FAILED_STARTUP, - cause = ex) + cause = ex, + parameters = listOf(msg)) + } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 341996a980..af5778868f 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -461,6 +461,7 @@ class DBCheckpointStorage( return session.createQuery(delete).executeUpdate() } + @Throws(SQLException::class) override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? { return getDBCheckpoint(id)?.toSerializedCheckpoint() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 8145df136c..fe395f7483 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -623,9 +623,8 @@ class SingleThreadedStateMachineManager( deduplicationHandler: DeduplicationHandler? ): CordaFuture> { - val flowAlreadyExists = mutex.locked { flows[flowId] != null } - - val existingCheckpoint = if (flowAlreadyExists) { + val existingFlow = mutex.locked { flows[flowId] } + val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) { // Load the flow's checkpoint // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) @@ -633,8 +632,10 @@ class SingleThreadedStateMachineManager( val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) if (checkpoint == null) { return openFuture>().mapError { - IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " + - "Something is very wrong. The flow will not retry.") + IllegalStateException( + "Unable to deserialize database checkpoint for flow $flowId. " + + "Something is very wrong. The flow will not retry." + ) } } else { checkpoint diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index af7d197d50..1f60bc0f2a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -146,6 +146,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, val payload = RejectSessionMessage(message, secureRandom.nextLong()) val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload) + log.info("Sending session initiation error back to $sender", error) + flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) event.deduplicationHandler.afterDatabaseTransaction() } diff --git a/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt b/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt index bc614f2f95..96f8c44a03 100644 --- a/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt +++ b/serialization-djvm/src/main/kotlin/net/corda/serialization/djvm/SandboxSerializerFactoryFactory.kt @@ -75,7 +75,7 @@ class SandboxSerializerFactoryFactory( ) ) - val fingerPrinter = TypeModellingFingerPrinter(customSerializerRegistry) + val fingerPrinter = TypeModellingFingerPrinter(customSerializerRegistry, classLoader) val localSerializerFactory = DefaultLocalSerializerFactory( whitelist = context.whitelist, diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt index 9cf064a70b..9b0ce7b9ae 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt @@ -7,7 +7,6 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.trace import net.corda.serialization.internal.model.* import net.corda.serialization.internal.model.TypeIdentifier.* -import net.corda.serialization.internal.model.TypeIdentifier.Companion.classLoaderFor import org.apache.qpid.proton.amqp.Symbol import java.lang.reflect.ParameterizedType import java.lang.reflect.Type @@ -161,7 +160,7 @@ class DefaultLocalSerializerFactory( val declaredGenericType = if (declaredType !is ParameterizedType && localTypeInformation.typeIdentifier is Parameterised && declaredClass != Class::class.java) { - localTypeInformation.typeIdentifier.getLocalType(classLoaderFor(declaredClass)) + localTypeInformation.typeIdentifier.getLocalType(classloader) } else { declaredType } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt index 27650621d7..e1d0aaee77 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactoryBuilder.kt @@ -101,7 +101,7 @@ object SerializerFactoryBuilder { val localTypeModel = ConfigurableLocalTypeModel(typeModelConfiguration) val fingerPrinter = overrideFingerPrinter ?: - TypeModellingFingerPrinter(customSerializerRegistry) + TypeModellingFingerPrinter(customSerializerRegistry, classCarpenter.classloader) val localSerializerFactory = DefaultLocalSerializerFactory( whitelist, diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt index 2697b107a8..3477c02a48 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeIdentifier.kt @@ -45,12 +45,12 @@ sealed class TypeIdentifier { * Obtain a nicely-formatted representation of the identified type, for help with debugging. */ fun prettyPrint(simplifyClassNames: Boolean = true): String = when(this) { - is TypeIdentifier.UnknownType -> "?" - is TypeIdentifier.TopType -> "*" - is TypeIdentifier.Unparameterised -> name.simplifyClassNameIfRequired(simplifyClassNames) - is TypeIdentifier.Erased -> "${name.simplifyClassNameIfRequired(simplifyClassNames)} (erased)" - is TypeIdentifier.ArrayOf -> "${componentType.prettyPrint(simplifyClassNames)}[]" - is TypeIdentifier.Parameterised -> + is UnknownType -> "?" + is TopType -> "*" + is Unparameterised -> name.simplifyClassNameIfRequired(simplifyClassNames) + is Erased -> "${name.simplifyClassNameIfRequired(simplifyClassNames)} (erased)" + is ArrayOf -> "${componentType.prettyPrint(simplifyClassNames)}[]" + is Parameterised -> name.simplifyClassNameIfRequired(simplifyClassNames) + parameters.joinToString(", ", "<", ">") { it.prettyPrint(simplifyClassNames) } @@ -63,8 +63,6 @@ sealed class TypeIdentifier { // This method has locking. So we memo the value here. private val systemClassLoader: ClassLoader = ClassLoader.getSystemClassLoader() - fun classLoaderFor(clazz: Class<*>): ClassLoader = clazz.classLoader ?: systemClassLoader - /** * Obtain the [TypeIdentifier] for an erased Java class. * @@ -81,7 +79,7 @@ sealed class TypeIdentifier { * Obtain the [TypeIdentifier] for a Java [Type] (typically obtained by calling one of * [java.lang.reflect.Parameter.getAnnotatedType], * [java.lang.reflect.Field.getGenericType] or - * [java.lang.reflect.Method.getGenericReturnType]). Wildcard types and type variables are converted to [Unknown]. + * [java.lang.reflect.Method.getGenericReturnType]). Wildcard types and type variables are converted to [UnknownType]. * * @param type The [Type] to obtain a [TypeIdentifier] for. * @param resolutionContext Optionally, a [Type] which can be used to resolve type variables, for example a @@ -273,5 +271,5 @@ private class ReconstitutedParameterizedType( other.ownerType == ownerType && Arrays.equals(other.actualTypeArguments, actualTypeArguments) override fun hashCode(): Int = - Arrays.hashCode(actualTypeArguments) xor Objects.hashCode(ownerType) xor Objects.hashCode(rawType) + actualTypeArguments.contentHashCode() xor Objects.hashCode(ownerType) xor Objects.hashCode(rawType) } \ No newline at end of file diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt index c5d79ed41f..8965a5c8e1 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/model/TypeModellingFingerPrinter.kt @@ -5,7 +5,6 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.toBase64 import net.corda.serialization.internal.amqp.* import net.corda.serialization.internal.model.TypeIdentifier.* -import net.corda.serialization.internal.model.TypeIdentifier.Companion.classLoaderFor import java.lang.reflect.ParameterizedType /** @@ -31,6 +30,7 @@ interface FingerPrinter { */ class TypeModellingFingerPrinter( private val customTypeDescriptorLookup: CustomSerializerRegistry, + private val classLoader: ClassLoader, private val debugEnabled: Boolean = false) : FingerPrinter { private val cache: MutableMap = DefaultCacheProvider.createCache() @@ -42,7 +42,7 @@ class TypeModellingFingerPrinter( * the Fingerprinter cannot guarantee that. */ cache.getOrPut(typeInformation.typeIdentifier) { - FingerPrintingState(customTypeDescriptorLookup, FingerprintWriter(debugEnabled)) + FingerPrintingState(customTypeDescriptorLookup, classLoader, FingerprintWriter(debugEnabled)) .fingerprint(typeInformation) } } @@ -95,6 +95,7 @@ internal class FingerprintWriter(debugEnabled: Boolean = false) { */ private class FingerPrintingState( private val customSerializerRegistry: CustomSerializerRegistry, + private val classLoader: ClassLoader, private val writer: FingerprintWriter) { companion object { @@ -200,7 +201,7 @@ private class FingerPrintingState( private fun fingerprintName(type: LocalTypeInformation) { val identifier = type.typeIdentifier when (identifier) { - is TypeIdentifier.ArrayOf -> writer.write(identifier.componentType.name).writeArray() + is ArrayOf -> writer.write(identifier.componentType.name).writeArray() else -> writer.write(identifier.name) } } @@ -239,7 +240,7 @@ private class FingerPrintingState( val observedGenericType = if (observedType !is ParameterizedType && type.typeIdentifier is Parameterised && observedClass != Class::class.java) { - type.typeIdentifier.getLocalType(classLoaderFor(observedClass)) + type.typeIdentifier.getLocalType(classLoader) } else { observedType } @@ -259,6 +260,5 @@ private class FingerPrintingState( // and deserializing (assuming deserialization is occurring in a factory that didn't // serialise the object in the first place (and thus the cache lookup fails). This is also // true of Any, where we need Example and Example to have the same fingerprint - private fun hasSeen(type: TypeIdentifier) = (type in typesSeen) - && (type != TypeIdentifier.UnknownType) + private fun hasSeen(type: TypeIdentifier) = (type in typesSeen) && (type != UnknownType) } diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt index 362972afc7..84c3a27e63 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/TypeModellingFingerPrinterTests.kt @@ -12,7 +12,7 @@ class TypeModellingFingerPrinterTests { val descriptorBasedSerializerRegistry = DefaultDescriptorBasedSerializerRegistry() val customRegistry = CachingCustomSerializerRegistry(descriptorBasedSerializerRegistry) - val fingerprinter = TypeModellingFingerPrinter(customRegistry, true) + val fingerprinter = TypeModellingFingerPrinter(customRegistry, ClassLoader.getSystemClassLoader(), true) // See https://r3-cev.atlassian.net/browse/CORDA-2266 @Test(timeout=300_000)