mirror of
https://github.com/corda/corda.git
synced 2024-12-24 07:06:44 +00:00
Merge branch 'release/os/4.5' into dan/os-4.5-to-4.6-merge-2020-06-18
# Conflicts: # node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt # node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
This commit is contained in:
commit
e8b17ff7b9
@ -1,6 +1,7 @@
|
||||
package net.corda.common.logging.errorReporting
|
||||
|
||||
import org.slf4j.Logger
|
||||
import java.lang.Exception
|
||||
import java.text.MessageFormat
|
||||
import java.util.*
|
||||
|
||||
@ -31,6 +32,10 @@ internal class ErrorReporterImpl(private val resourceLocation: String,
|
||||
override fun report(error: ErrorCode<*>, logger: Logger) {
|
||||
val errorResource = ErrorResource.fromErrorCode(error, resourceLocation, locale)
|
||||
val message = "${errorResource.getErrorMessage(error.parameters.toTypedArray())} ${getErrorInfo(error)}"
|
||||
logger.error(message)
|
||||
if (error is Exception) {
|
||||
logger.error(message, error)
|
||||
} else {
|
||||
logger.error(message)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
errorTemplate = Failed to create the datasource. See the logs for further information and the cause.
|
||||
errorTemplate = Failed to create the datasource: {0}. See the logs for further information and the cause.
|
||||
shortDescription = The datasource could not be created for unknown reasons.
|
||||
actionsToFix = The logs in the logs directory should contain more information on what went wrong.
|
||||
aliases =
|
@ -1,3 +1,3 @@
|
||||
errorTemplate = Failed to create the datasource. See the logs for further information and the cause.
|
||||
errorTemplate = Failed to create the datasource: {0}. See the logs for further information and the cause.
|
||||
shortDescription = The datasource could not be created for unknown reasons.
|
||||
actionsToFix = The logs in the logs directory should contain more information on what went wrong.
|
@ -6,7 +6,7 @@ import java.net.InetAddress
|
||||
class DatabaseErrorsTest : ErrorCodeTest<NodeDatabaseErrors>(NodeDatabaseErrors::class.java) {
|
||||
override val dataForCodes = mapOf(
|
||||
NodeDatabaseErrors.COULD_NOT_CONNECT to listOf<Any>(),
|
||||
NodeDatabaseErrors.FAILED_STARTUP to listOf(),
|
||||
NodeDatabaseErrors.FAILED_STARTUP to listOf("This is a test message"),
|
||||
NodeDatabaseErrors.MISSING_DRIVER to listOf(),
|
||||
NodeDatabaseErrors.PASSWORD_REQUIRED_FOR_H2 to listOf(InetAddress.getLocalHost())
|
||||
)
|
||||
|
@ -7,6 +7,7 @@ import net.corda.common.logging.errorReporting.ErrorContextProvider
|
||||
import net.corda.common.logging.errorReporting.ErrorReporterImpl
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import org.mockito.ArgumentMatchers.any
|
||||
import org.mockito.ArgumentMatchers.anyString
|
||||
import org.mockito.Mockito
|
||||
import org.slf4j.Logger
|
||||
@ -24,6 +25,7 @@ class ErrorReporterImplTest {
|
||||
|
||||
private val loggerMock = Mockito.mock(Logger::class.java).also {
|
||||
Mockito.`when`(it.error(anyString())).then { logs.addAll(it.arguments) }
|
||||
Mockito.`when`(it.error(anyString(), any(Exception::class.java))).then { params -> logs.addAll(params.arguments) }
|
||||
}
|
||||
|
||||
private val contextProvider: ErrorContextProvider = object : ErrorContextProvider {
|
||||
@ -39,7 +41,8 @@ class ErrorReporterImplTest {
|
||||
private enum class TestErrors : ErrorCodes {
|
||||
CASE1,
|
||||
CASE2,
|
||||
CASE_3;
|
||||
CASE_3,
|
||||
CASE4;
|
||||
|
||||
override val namespace = TestNamespaces.TEST.toString()
|
||||
}
|
||||
@ -59,6 +62,11 @@ class ErrorReporterImplTest {
|
||||
override val parameters = listOf<Any>()
|
||||
}
|
||||
|
||||
private class TestError4(cause: Exception?) : Exception("This is test error 4", cause), ErrorCode<TestErrors> {
|
||||
override val code = TestErrors.CASE4
|
||||
override val parameters = listOf<Any>()
|
||||
}
|
||||
|
||||
private fun createReporterImpl(localeTag: String?) : ErrorReporterImpl {
|
||||
val locale = if (localeTag != null) Locale.forLanguageTag(localeTag) else Locale.getDefault()
|
||||
return ErrorReporterImpl("errorReporting", locale, contextProvider)
|
||||
@ -118,4 +126,12 @@ class ErrorReporterImplTest {
|
||||
testReporter.report(error, loggerMock)
|
||||
assertEquals(listOf("This is the third test message [Code: test-case-3 URL: $TEST_URL/en-US]"), logs)
|
||||
}
|
||||
|
||||
@Test(timeout = 3_000)
|
||||
fun `exception based error code logs the stack trace`() {
|
||||
val error = TestError4(Exception("A test exception"))
|
||||
val testReporter = createReporterImpl("en-US")
|
||||
testReporter.report(error, loggerMock)
|
||||
assertEquals(listOf("This is the fourth test message [Code: test-case4 URL: $TEST_URL/en-US]", error), logs)
|
||||
}
|
||||
}
|
@ -0,0 +1,4 @@
|
||||
errorTemplate = This is the fourth test message
|
||||
shortDescription = Test description
|
||||
actionsToFix = Actions
|
||||
aliases =
|
@ -0,0 +1,4 @@
|
||||
errorTemplate = This is the fourth test message
|
||||
shortDescription = Test description
|
||||
actionsToFix = Actions
|
||||
aliases =
|
@ -4,6 +4,7 @@ import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.statemachine.transitions.TopLevelTransition
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
@ -11,6 +12,8 @@ import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
@ -18,6 +21,10 @@ import kotlin.test.assertFailsWith
|
||||
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
|
||||
class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
|
||||
private companion object {
|
||||
val executor: ExecutorService = Executors.newSingleThreadExecutor()
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.SendInitial] action.
|
||||
* The exception is thrown 4 times.
|
||||
@ -25,8 +32,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and is then kept in
|
||||
* the hospital for observation.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -79,7 +86,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
@ -105,8 +115,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -158,7 +168,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -185,8 +198,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries.
|
||||
* The flow should complete successfully as the error is swallowed.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -238,7 +251,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -270,8 +286,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to
|
||||
* verify that 3 retries are attempted before recovering.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -323,7 +339,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -356,8 +375,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
*
|
||||
* CORDA-3352 - it is currently hanging after putting the flow in for observation
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
@Ignore
|
||||
@Test(timeout = 300_000)
|
||||
@Ignore
|
||||
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
@ -411,7 +430,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
@ -443,8 +465,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to
|
||||
* verify that 3 retries are attempted before recovering.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -513,7 +535,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -540,8 +565,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
*
|
||||
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -602,7 +627,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -629,8 +657,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* The flow is discharged and replayed from the hospital once. After failing during the replay, the flow is forced into overnight
|
||||
* observation. It is not ran again after this point
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during retry of a flow will force the flow into overnight observation`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during retry of a flow will force the flow into overnight observation`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -699,7 +727,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
@ -729,8 +760,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* flow will still finish successfully. This is due to the even being scheduled as part of the retry and the failure in the database
|
||||
* commit occurs after this point. As the flow is already scheduled, the failure has not affect on it.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during commit transaction action when retrying a flow will retry the flow again and complete successfully`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -798,7 +829,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -828,8 +862,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* CORDA-3352 - it is currently hanging after putting the flow in for observation
|
||||
*
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
@Ignore
|
||||
@Test(timeout = 300_000)
|
||||
@Ignore
|
||||
fun `error during retrying a flow that failed when committing its original checkpoint will force the flow into overnight observation`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
@ -883,7 +917,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
@ -910,8 +947,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
*
|
||||
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
@ -975,7 +1012,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
@ -994,6 +1034,196 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state).
|
||||
*
|
||||
* The exception is thrown 5 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow can be retried when there is a transient connection error to the database`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 5
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
|
||||
RULE Entering internal error staff member
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Reached internal transition error staff member")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment discharge counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ DISCHARGE
|
||||
IF true
|
||||
DO traceln("Byteman test - discharging")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment observation counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ OVERNIGHT_OBSERVATION
|
||||
IF true
|
||||
DO traceln("Byteman test - overnight observation")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules)
|
||||
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
val output = getBytemanOutput(alice)
|
||||
|
||||
// Check the stdout for the lines generated by byteman
|
||||
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
|
||||
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
|
||||
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
|
||||
assertEquals(3, discharge)
|
||||
assertEquals(0, observation)
|
||||
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
|
||||
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state).
|
||||
*
|
||||
* The exception is thrown 7 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* fails and is kept for in for observation.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() {
|
||||
startDriver {
|
||||
val charlie = createNode(CHARLIE_NAME)
|
||||
val alice = createBytemanNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 7
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
|
||||
RULE Entering internal error staff member
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Reached internal transition error staff member")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment discharge counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ DISCHARGE
|
||||
IF true
|
||||
DO traceln("Byteman test - discharging")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment observation counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ OVERNIGHT_OBSERVATION
|
||||
IF true
|
||||
DO traceln("Byteman test - overnight observation")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules)
|
||||
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
executor.execute {
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
|
||||
}
|
||||
|
||||
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
|
||||
Thread.sleep(30.seconds.toMillis())
|
||||
|
||||
val output = getBytemanOutput(alice)
|
||||
|
||||
// Check the stdout for the lines generated by byteman
|
||||
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
|
||||
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
|
||||
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
|
||||
assertEquals(3, discharge)
|
||||
assertEquals(1, observation)
|
||||
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
|
||||
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event on a responding flow. The failure prevents the node from saving
|
||||
* its original checkpoint.
|
||||
@ -1009,8 +1239,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
|
||||
* that 3 retries are attempted before recovering.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val charlie = createBytemanNode(CHARLIE_NAME)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
@ -1064,7 +1294,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val charlieClient =
|
||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -1104,8 +1337,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* able to recover when the node is restarted (by using the events). The initiating flow maintains the checkpoint as it is waiting for
|
||||
* the responding flow to recover and finish its flow.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() {
|
||||
startDriver {
|
||||
val charlie = createBytemanNode(CHARLIE_NAME)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
@ -1160,7 +1393,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
@ -1192,8 +1428,8 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
|
||||
startDriver {
|
||||
val charlie = createBytemanNode(CHARLIE_NAME)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
@ -1258,7 +1494,10 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
val charlieClient =
|
||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
@ -1278,4 +1517,202 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
|
||||
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state) on a responding node.
|
||||
*
|
||||
* The exception is thrown 5 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* succeeds and the flow finishes.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `responding flow - session init can be retried when there is a transient connection error to the database`() {
|
||||
startDriver {
|
||||
val charlie = createBytemanNode(CHARLIE_NAME)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 5
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
|
||||
RULE Entering internal error staff member
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Reached internal transition error staff member")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment discharge counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ DISCHARGE
|
||||
IF true
|
||||
DO traceln("Byteman test - discharging")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment observation counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ OVERNIGHT_OBSERVATION
|
||||
IF true
|
||||
DO traceln("Byteman test - overnight observation")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules)
|
||||
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
val charlieClient =
|
||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
|
||||
val output = getBytemanOutput(charlie)
|
||||
|
||||
// Check the stdout for the lines generated by byteman
|
||||
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
|
||||
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
|
||||
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
|
||||
assertEquals(3, discharge)
|
||||
assertEquals(0, observation)
|
||||
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
|
||||
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
|
||||
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||
* state) on a responding node.
|
||||
*
|
||||
* The exception is thrown 7 times.
|
||||
*
|
||||
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
|
||||
*
|
||||
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
|
||||
* retrying the flow after it failed to commit it's original checkpoint.
|
||||
*
|
||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||
* fails and is kept for in for observation.
|
||||
*/
|
||||
@Test(timeout = 300_000)
|
||||
fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() {
|
||||
startDriver {
|
||||
val charlie = createBytemanNode(CHARLIE_NAME)
|
||||
val alice = createNode(ALICE_NAME)
|
||||
|
||||
val rules = """
|
||||
RULE Create Counter
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF createCounter("counter", $counter)
|
||||
DO traceln("Counter created")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on executeCommitTransaction action
|
||||
CLASS ${ActionExecutorImpl::class.java.name}
|
||||
METHOD executeCommitTransaction
|
||||
AT ENTRY
|
||||
IF readCounter("counter") < 7
|
||||
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
|
||||
ENDRULE
|
||||
|
||||
RULE Throw exception on getCheckpoint
|
||||
INTERFACE ${CheckpointStorage::class.java.name}
|
||||
METHOD getCheckpoint
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
|
||||
ENDRULE
|
||||
|
||||
RULE Entering internal error staff member
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT ENTRY
|
||||
IF true
|
||||
DO traceln("Reached internal transition error staff member")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment discharge counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ DISCHARGE
|
||||
IF true
|
||||
DO traceln("Byteman test - discharging")
|
||||
ENDRULE
|
||||
|
||||
RULE Increment observation counter
|
||||
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
|
||||
METHOD consult
|
||||
AT READ OVERNIGHT_OBSERVATION
|
||||
IF true
|
||||
DO traceln("Byteman test - overnight observation")
|
||||
ENDRULE
|
||||
""".trimIndent()
|
||||
|
||||
submitBytemanRules(rules)
|
||||
|
||||
val aliceClient =
|
||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
val charlieClient =
|
||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceClient.startFlow(
|
||||
StatemachineErrorHandlingTest::SendAMessageFlow,
|
||||
charlie.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(
|
||||
30.seconds
|
||||
)
|
||||
}
|
||||
|
||||
val output = getBytemanOutput(charlie)
|
||||
|
||||
// Check the stdout for the lines generated by byteman
|
||||
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
|
||||
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
|
||||
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
|
||||
assertEquals(3, discharge)
|
||||
assertEquals(1, observation)
|
||||
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
|
||||
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
|
||||
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
package net.corda.contracts.serialization.generics
|
||||
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
||||
@CordaSerializable
|
||||
data class DataObject(val value: Long) : Comparable<DataObject> {
|
||||
override fun toString(): String {
|
||||
return "$value data points"
|
||||
}
|
||||
|
||||
override fun compareTo(other: DataObject): Int {
|
||||
return value.compareTo(other.value)
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
package net.corda.contracts.serialization.generics
|
||||
|
||||
import net.corda.core.contracts.CommandData
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import java.util.Optional
|
||||
|
||||
@Suppress("unused")
|
||||
class GenericTypeContract : Contract {
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
val state = tx.outputsOfType<State>()
|
||||
require(state.isNotEmpty()) {
|
||||
"Requires at least one data state"
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("CanBeParameter", "MemberVisibilityCanBePrivate")
|
||||
class State(val owner: AbstractParty, val data: DataObject) : ContractState {
|
||||
override val participants: List<AbstractParty> = listOf(owner)
|
||||
|
||||
@Override
|
||||
override fun toString(): String {
|
||||
return data.toString()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The [price] field is the important feature of the [Purchase]
|
||||
* class because its type is [Optional] with a CorDapp-specific
|
||||
* generic type parameter. It does not matter that the [price]
|
||||
* is not used; it only matters that the [Purchase] command
|
||||
* must be serialized as part of building a new transaction.
|
||||
*/
|
||||
class Purchase(val price: Optional<DataObject>) : CommandData
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package net.corda.flows.serialization.generics
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.contracts.serialization.generics.DataObject
|
||||
import net.corda.contracts.serialization.generics.GenericTypeContract.Purchase
|
||||
import net.corda.contracts.serialization.generics.GenericTypeContract.State
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import java.util.Optional
|
||||
|
||||
@StartableByRPC
|
||||
class GenericTypeFlow(private val purchase: DataObject) : FlowLogic<SecureHash>() {
|
||||
@Suspendable
|
||||
override fun call(): SecureHash {
|
||||
val notary = serviceHub.networkMapCache.notaryIdentities[0]
|
||||
val stx = serviceHub.signInitialTransaction(
|
||||
TransactionBuilder(notary)
|
||||
.addOutputState(State(ourIdentity, purchase))
|
||||
.addCommand(Command(Purchase(Optional.of(purchase)), ourIdentity.owningKey))
|
||||
)
|
||||
stx.verify(serviceHub, checkSufficientSignatures = false)
|
||||
return stx.id
|
||||
}
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
package net.corda.node
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.contracts.serialization.generics.DataObject
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.flows.serialization.generics.GenericTypeFlow
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.cordappWithPackages
|
||||
import org.junit.Test
|
||||
|
||||
@Suppress("FunctionName")
|
||||
class ContractWithGenericTypeTest {
|
||||
companion object {
|
||||
const val DATA_VALUE = 5000L
|
||||
|
||||
@JvmField
|
||||
val logger = loggerFor<ContractWithGenericTypeTest>()
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow with generic type`() {
|
||||
val user = User("u", "p", setOf(Permissions.all()))
|
||||
driver(DriverParameters(
|
||||
portAllocation = incrementalPortAllocation(),
|
||||
startNodesInProcess = false,
|
||||
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, validating = true)),
|
||||
cordappsForAllNodes = listOf(
|
||||
cordappWithPackages("net.corda.flows.serialization.generics").signed(),
|
||||
cordappWithPackages("net.corda.contracts.serialization.generics").signed()
|
||||
)
|
||||
)) {
|
||||
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val txID = CordaRPCClient(hostAndPort = alice.rpcAddress)
|
||||
.start(user.username, user.password)
|
||||
.use { client ->
|
||||
client.proxy.startFlow(::GenericTypeFlow, DataObject(DATA_VALUE))
|
||||
.returnValue
|
||||
.getOrThrow()
|
||||
}
|
||||
logger.info("TX-ID=$txID")
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.contracts.serialization.generics.DataObject
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.flows.serialization.generics.GenericTypeFlow
|
||||
import net.corda.node.DeterministicSourcesRule
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.testing.node.NotarySpec
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.cordappWithPackages
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
|
||||
@Suppress("FunctionName")
|
||||
class DeterministicContractWithGenericTypeTest {
|
||||
companion object {
|
||||
const val DATA_VALUE = 5000L
|
||||
|
||||
@JvmField
|
||||
val logger = loggerFor<DeterministicContractWithGenericTypeTest>()
|
||||
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val djvmSources = DeterministicSourcesRule()
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test DJVM can deserialise command with generic type`() {
|
||||
val user = User("u", "p", setOf(Permissions.all()))
|
||||
driver(DriverParameters(
|
||||
portAllocation = incrementalPortAllocation(),
|
||||
startNodesInProcess = false,
|
||||
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, validating = true)),
|
||||
cordappsForAllNodes = listOf(
|
||||
cordappWithPackages("net.corda.flows.serialization.generics").signed(),
|
||||
cordappWithPackages("net.corda.contracts.serialization.generics").signed()
|
||||
),
|
||||
djvmBootstrapSource = djvmSources.bootstrap,
|
||||
djvmCordaSource = djvmSources.corda
|
||||
)) {
|
||||
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val txID = CordaRPCClient(hostAndPort = alice.rpcAddress)
|
||||
.start(user.username, user.password)
|
||||
.use { client ->
|
||||
client.proxy.startFlow(::GenericTypeFlow, DataObject(DATA_VALUE))
|
||||
.returnValue
|
||||
.getOrThrow()
|
||||
}
|
||||
logger.info("TX-ID=$txID")
|
||||
}
|
||||
}
|
||||
}
|
@ -1361,11 +1361,14 @@ fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfi
|
||||
"Could not find the database driver class. Please add it to the 'drivers' folder.",
|
||||
NodeDatabaseErrors.MISSING_DRIVER)
|
||||
ex is OutstandingDatabaseChangesException -> throw (DatabaseIncompatibleException(ex.message))
|
||||
else ->
|
||||
else -> {
|
||||
val msg = ex.message ?: ex::class.java.canonicalName
|
||||
throw CouldNotCreateDataSourceException(
|
||||
"Could not create the DataSource: ${ex.message}",
|
||||
NodeDatabaseErrors.FAILED_STARTUP,
|
||||
cause = ex)
|
||||
cause = ex,
|
||||
parameters = listOf(msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -461,6 +461,7 @@ class DBCheckpointStorage(
|
||||
return session.createQuery(delete).executeUpdate()
|
||||
}
|
||||
|
||||
@Throws(SQLException::class)
|
||||
override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? {
|
||||
return getDBCheckpoint(id)?.toSerializedCheckpoint()
|
||||
}
|
||||
|
@ -623,9 +623,8 @@ class SingleThreadedStateMachineManager(
|
||||
deduplicationHandler: DeduplicationHandler?
|
||||
): CordaFuture<FlowStateMachine<A>> {
|
||||
|
||||
val flowAlreadyExists = mutex.locked { flows[flowId] != null }
|
||||
|
||||
val existingCheckpoint = if (flowAlreadyExists) {
|
||||
val existingFlow = mutex.locked { flows[flowId] }
|
||||
val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState?.value?.isAnyCheckpointPersisted == true) {
|
||||
// Load the flow's checkpoint
|
||||
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
|
||||
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
|
||||
@ -633,8 +632,10 @@ class SingleThreadedStateMachineManager(
|
||||
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId)
|
||||
if (checkpoint == null) {
|
||||
return openFuture<FlowStateMachine<A>>().mapError {
|
||||
IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " +
|
||||
"Something is very wrong. The flow will not retry.")
|
||||
IllegalStateException(
|
||||
"Unable to deserialize database checkpoint for flow $flowId. " +
|
||||
"Something is very wrong. The flow will not retry."
|
||||
)
|
||||
}
|
||||
} else {
|
||||
checkpoint
|
||||
|
@ -146,6 +146,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
||||
val payload = RejectSessionMessage(message, secureRandom.nextLong())
|
||||
val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload)
|
||||
|
||||
log.info("Sending session initiation error back to $sender", error)
|
||||
|
||||
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
|
||||
event.deduplicationHandler.afterDatabaseTransaction()
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ class SandboxSerializerFactoryFactory(
|
||||
)
|
||||
)
|
||||
|
||||
val fingerPrinter = TypeModellingFingerPrinter(customSerializerRegistry)
|
||||
val fingerPrinter = TypeModellingFingerPrinter(customSerializerRegistry, classLoader)
|
||||
|
||||
val localSerializerFactory = DefaultLocalSerializerFactory(
|
||||
whitelist = context.whitelist,
|
||||
|
@ -7,7 +7,6 @@ import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.serialization.internal.model.*
|
||||
import net.corda.serialization.internal.model.TypeIdentifier.*
|
||||
import net.corda.serialization.internal.model.TypeIdentifier.Companion.classLoaderFor
|
||||
import org.apache.qpid.proton.amqp.Symbol
|
||||
import java.lang.reflect.ParameterizedType
|
||||
import java.lang.reflect.Type
|
||||
@ -161,7 +160,7 @@ class DefaultLocalSerializerFactory(
|
||||
val declaredGenericType = if (declaredType !is ParameterizedType
|
||||
&& localTypeInformation.typeIdentifier is Parameterised
|
||||
&& declaredClass != Class::class.java) {
|
||||
localTypeInformation.typeIdentifier.getLocalType(classLoaderFor(declaredClass))
|
||||
localTypeInformation.typeIdentifier.getLocalType(classloader)
|
||||
} else {
|
||||
declaredType
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ object SerializerFactoryBuilder {
|
||||
val localTypeModel = ConfigurableLocalTypeModel(typeModelConfiguration)
|
||||
|
||||
val fingerPrinter = overrideFingerPrinter ?:
|
||||
TypeModellingFingerPrinter(customSerializerRegistry)
|
||||
TypeModellingFingerPrinter(customSerializerRegistry, classCarpenter.classloader)
|
||||
|
||||
val localSerializerFactory = DefaultLocalSerializerFactory(
|
||||
whitelist,
|
||||
|
@ -45,12 +45,12 @@ sealed class TypeIdentifier {
|
||||
* Obtain a nicely-formatted representation of the identified type, for help with debugging.
|
||||
*/
|
||||
fun prettyPrint(simplifyClassNames: Boolean = true): String = when(this) {
|
||||
is TypeIdentifier.UnknownType -> "?"
|
||||
is TypeIdentifier.TopType -> "*"
|
||||
is TypeIdentifier.Unparameterised -> name.simplifyClassNameIfRequired(simplifyClassNames)
|
||||
is TypeIdentifier.Erased -> "${name.simplifyClassNameIfRequired(simplifyClassNames)} (erased)"
|
||||
is TypeIdentifier.ArrayOf -> "${componentType.prettyPrint(simplifyClassNames)}[]"
|
||||
is TypeIdentifier.Parameterised ->
|
||||
is UnknownType -> "?"
|
||||
is TopType -> "*"
|
||||
is Unparameterised -> name.simplifyClassNameIfRequired(simplifyClassNames)
|
||||
is Erased -> "${name.simplifyClassNameIfRequired(simplifyClassNames)} (erased)"
|
||||
is ArrayOf -> "${componentType.prettyPrint(simplifyClassNames)}[]"
|
||||
is Parameterised ->
|
||||
name.simplifyClassNameIfRequired(simplifyClassNames) + parameters.joinToString(", ", "<", ">") {
|
||||
it.prettyPrint(simplifyClassNames)
|
||||
}
|
||||
@ -63,8 +63,6 @@ sealed class TypeIdentifier {
|
||||
// This method has locking. So we memo the value here.
|
||||
private val systemClassLoader: ClassLoader = ClassLoader.getSystemClassLoader()
|
||||
|
||||
fun classLoaderFor(clazz: Class<*>): ClassLoader = clazz.classLoader ?: systemClassLoader
|
||||
|
||||
/**
|
||||
* Obtain the [TypeIdentifier] for an erased Java class.
|
||||
*
|
||||
@ -81,7 +79,7 @@ sealed class TypeIdentifier {
|
||||
* Obtain the [TypeIdentifier] for a Java [Type] (typically obtained by calling one of
|
||||
* [java.lang.reflect.Parameter.getAnnotatedType],
|
||||
* [java.lang.reflect.Field.getGenericType] or
|
||||
* [java.lang.reflect.Method.getGenericReturnType]). Wildcard types and type variables are converted to [Unknown].
|
||||
* [java.lang.reflect.Method.getGenericReturnType]). Wildcard types and type variables are converted to [UnknownType].
|
||||
*
|
||||
* @param type The [Type] to obtain a [TypeIdentifier] for.
|
||||
* @param resolutionContext Optionally, a [Type] which can be used to resolve type variables, for example a
|
||||
@ -273,5 +271,5 @@ private class ReconstitutedParameterizedType(
|
||||
other.ownerType == ownerType &&
|
||||
Arrays.equals(other.actualTypeArguments, actualTypeArguments)
|
||||
override fun hashCode(): Int =
|
||||
Arrays.hashCode(actualTypeArguments) xor Objects.hashCode(ownerType) xor Objects.hashCode(rawType)
|
||||
actualTypeArguments.contentHashCode() xor Objects.hashCode(ownerType) xor Objects.hashCode(rawType)
|
||||
}
|
@ -5,7 +5,6 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.toBase64
|
||||
import net.corda.serialization.internal.amqp.*
|
||||
import net.corda.serialization.internal.model.TypeIdentifier.*
|
||||
import net.corda.serialization.internal.model.TypeIdentifier.Companion.classLoaderFor
|
||||
import java.lang.reflect.ParameterizedType
|
||||
|
||||
/**
|
||||
@ -31,6 +30,7 @@ interface FingerPrinter {
|
||||
*/
|
||||
class TypeModellingFingerPrinter(
|
||||
private val customTypeDescriptorLookup: CustomSerializerRegistry,
|
||||
private val classLoader: ClassLoader,
|
||||
private val debugEnabled: Boolean = false) : FingerPrinter {
|
||||
|
||||
private val cache: MutableMap<TypeIdentifier, String> = DefaultCacheProvider.createCache()
|
||||
@ -42,7 +42,7 @@ class TypeModellingFingerPrinter(
|
||||
* the Fingerprinter cannot guarantee that.
|
||||
*/
|
||||
cache.getOrPut(typeInformation.typeIdentifier) {
|
||||
FingerPrintingState(customTypeDescriptorLookup, FingerprintWriter(debugEnabled))
|
||||
FingerPrintingState(customTypeDescriptorLookup, classLoader, FingerprintWriter(debugEnabled))
|
||||
.fingerprint(typeInformation)
|
||||
}
|
||||
}
|
||||
@ -95,6 +95,7 @@ internal class FingerprintWriter(debugEnabled: Boolean = false) {
|
||||
*/
|
||||
private class FingerPrintingState(
|
||||
private val customSerializerRegistry: CustomSerializerRegistry,
|
||||
private val classLoader: ClassLoader,
|
||||
private val writer: FingerprintWriter) {
|
||||
|
||||
companion object {
|
||||
@ -200,7 +201,7 @@ private class FingerPrintingState(
|
||||
private fun fingerprintName(type: LocalTypeInformation) {
|
||||
val identifier = type.typeIdentifier
|
||||
when (identifier) {
|
||||
is TypeIdentifier.ArrayOf -> writer.write(identifier.componentType.name).writeArray()
|
||||
is ArrayOf -> writer.write(identifier.componentType.name).writeArray()
|
||||
else -> writer.write(identifier.name)
|
||||
}
|
||||
}
|
||||
@ -239,7 +240,7 @@ private class FingerPrintingState(
|
||||
val observedGenericType = if (observedType !is ParameterizedType
|
||||
&& type.typeIdentifier is Parameterised
|
||||
&& observedClass != Class::class.java) {
|
||||
type.typeIdentifier.getLocalType(classLoaderFor(observedClass))
|
||||
type.typeIdentifier.getLocalType(classLoader)
|
||||
} else {
|
||||
observedType
|
||||
}
|
||||
@ -259,6 +260,5 @@ private class FingerPrintingState(
|
||||
// and deserializing (assuming deserialization is occurring in a factory that didn't
|
||||
// serialise the object in the first place (and thus the cache lookup fails). This is also
|
||||
// true of Any, where we need Example<A, B> and Example<?, ?> to have the same fingerprint
|
||||
private fun hasSeen(type: TypeIdentifier) = (type in typesSeen)
|
||||
&& (type != TypeIdentifier.UnknownType)
|
||||
private fun hasSeen(type: TypeIdentifier) = (type in typesSeen) && (type != UnknownType)
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ class TypeModellingFingerPrinterTests {
|
||||
|
||||
val descriptorBasedSerializerRegistry = DefaultDescriptorBasedSerializerRegistry()
|
||||
val customRegistry = CachingCustomSerializerRegistry(descriptorBasedSerializerRegistry)
|
||||
val fingerprinter = TypeModellingFingerPrinter(customRegistry, true)
|
||||
val fingerprinter = TypeModellingFingerPrinter(customRegistry, ClassLoader.getSystemClassLoader(), true)
|
||||
|
||||
// See https://r3-cev.atlassian.net/browse/CORDA-2266
|
||||
@Test(timeout=300_000)
|
||||
|
Loading…
Reference in New Issue
Block a user