mirror of
https://github.com/corda/corda.git
synced 2025-06-23 17:53:31 +00:00
Fix and or suppress detekt warnings
This commit is contained in:
committed by
LankyDan
parent
48aa2f2faa
commit
119f939ee1
@ -21,6 +21,10 @@ class NotaryException(
|
|||||||
/** Specifies the cause for notarisation request failure. */
|
/** Specifies the cause for notarisation request failure. */
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
sealed class NotaryError {
|
sealed class NotaryError {
|
||||||
|
companion object {
|
||||||
|
const val NUM_STATES = 5
|
||||||
|
}
|
||||||
|
|
||||||
/** Occurs when one or more input states have already been consumed by another transaction. */
|
/** Occurs when one or more input states have already been consumed by another transaction. */
|
||||||
data class Conflict(
|
data class Conflict(
|
||||||
/** Id of the transaction that was attempted to be notarised. */
|
/** Id of the transaction that was attempted to be notarised. */
|
||||||
@ -28,8 +32,9 @@ sealed class NotaryError {
|
|||||||
/** Specifies which states have already been consumed in another transaction. */
|
/** Specifies which states have already been consumed in another transaction. */
|
||||||
val consumedStates: Map<StateRef, StateConsumptionDetails>
|
val consumedStates: Map<StateRef, StateConsumptionDetails>
|
||||||
) : NotaryError() {
|
) : NotaryError() {
|
||||||
override fun toString() = "One or more input states or referenced states have already been used as input states in other transactions. Conflicting state count: ${consumedStates.size}, consumption details:\n" +
|
override fun toString() = "One or more input states or referenced states have already been used as input states in other transactions. " +
|
||||||
"${consumedStates.asSequence().joinToString(",\n", limit = 5) { it.key.toString() + " -> " + it.value }}.\n" +
|
"Conflicting state count: ${consumedStates.size}, consumption details:\n" +
|
||||||
|
"${consumedStates.asSequence().joinToString(",\n", limit = NUM_STATES) { it.key.toString() + " -> " + it.value }}.\n" +
|
||||||
"To find out if any of the conflicting transactions have been generated by this node you can use the hashLookup Corda shell command."
|
"To find out if any of the conflicting transactions have been generated by this node you can use the hashLookup Corda shell command."
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,6 +92,7 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
|
|||||||
/**
|
/**
|
||||||
* VaultQueryCriteria: provides query by attributes defined in [VaultSchema.VaultStates]
|
* VaultQueryCriteria: provides query by attributes defined in [VaultSchema.VaultStates]
|
||||||
*/
|
*/
|
||||||
|
@Suppress("MagicNumber") // need to list deprecation versions explicitly
|
||||||
data class VaultQueryCriteria(
|
data class VaultQueryCriteria(
|
||||||
override val status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
|
override val status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
|
||||||
override val contractStateTypes: Set<Class<out ContractState>>? = null,
|
override val contractStateTypes: Set<Class<out ContractState>>? = null,
|
||||||
@ -264,6 +265,7 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
|
|||||||
/**
|
/**
|
||||||
* LinearStateQueryCriteria: provides query by attributes defined in [VaultSchema.VaultLinearState]
|
* LinearStateQueryCriteria: provides query by attributes defined in [VaultSchema.VaultLinearState]
|
||||||
*/
|
*/
|
||||||
|
@Suppress("MagicNumber") // need to list deprecation versions explicitly
|
||||||
data class LinearStateQueryCriteria(
|
data class LinearStateQueryCriteria(
|
||||||
override val participants: List<AbstractParty>? = null,
|
override val participants: List<AbstractParty>? = null,
|
||||||
val uuid: List<UUID>? = null,
|
val uuid: List<UUID>? = null,
|
||||||
@ -545,6 +547,7 @@ sealed class AttachmentQueryCriteria : GenericQueryCriteria<AttachmentQueryCrite
|
|||||||
/**
|
/**
|
||||||
* AttachmentsQueryCriteria:
|
* AttachmentsQueryCriteria:
|
||||||
*/
|
*/
|
||||||
|
@Suppress("MagicNumber") // need to list deprecation versions explicitly
|
||||||
data class AttachmentsQueryCriteria(val uploaderCondition: ColumnPredicate<String>? = null,
|
data class AttachmentsQueryCriteria(val uploaderCondition: ColumnPredicate<String>? = null,
|
||||||
val filenameCondition: ColumnPredicate<String>? = null,
|
val filenameCondition: ColumnPredicate<String>? = null,
|
||||||
val uploadDateCondition: ColumnPredicate<Instant>? = null,
|
val uploadDateCondition: ColumnPredicate<Instant>? = null,
|
||||||
|
@ -90,7 +90,9 @@ class PersistentState(@EmbeddedId override var stateRef: PersistentStateRef? = n
|
|||||||
@KeepForDJVM
|
@KeepForDJVM
|
||||||
@Embeddable
|
@Embeddable
|
||||||
@Immutable
|
@Immutable
|
||||||
|
|
||||||
data class PersistentStateRef(
|
data class PersistentStateRef(
|
||||||
|
@Suppress("MagicNumber") // column width
|
||||||
@Column(name = "transaction_id", length = 64, nullable = false)
|
@Column(name = "transaction_id", length = 64, nullable = false)
|
||||||
var txId: String,
|
var txId: String,
|
||||||
|
|
||||||
|
@ -52,12 +52,7 @@ empty-blocks:
|
|||||||
EmptyClassBlock:
|
EmptyClassBlock:
|
||||||
active: true
|
active: true
|
||||||
EmptyDefaultConstructor:
|
EmptyDefaultConstructor:
|
||||||
active: true
|
|
||||||
EmptyDoWhileBlock:
|
|
||||||
active: true
|
|
||||||
EmptyElseBlock:
|
|
||||||
active: true
|
|
||||||
EmptyFinallyBlock:
|
|
||||||
active: true
|
active: true
|
||||||
EmptyForBlock:
|
EmptyForBlock:
|
||||||
active: true
|
active: true
|
||||||
|
@ -266,6 +266,7 @@ class InitiatorFlow(val arg1: Boolean, val arg2: Int, private val counterparty:
|
|||||||
val ourOutputState: DummyState = DummyState()
|
val ourOutputState: DummyState = DummyState()
|
||||||
// DOCEND 22
|
// DOCEND 22
|
||||||
// Or as copies of other states with some properties changed.
|
// Or as copies of other states with some properties changed.
|
||||||
|
@Suppress("MagicNumber") // literally a magic number
|
||||||
// DOCSTART 23
|
// DOCSTART 23
|
||||||
val ourOtherOutputState: DummyState = ourOutputState.copy(magicNumber = 77)
|
val ourOtherOutputState: DummyState = ourOutputState.copy(magicNumber = 77)
|
||||||
// DOCEND 23
|
// DOCEND 23
|
||||||
|
@ -18,6 +18,7 @@ object CashSchema
|
|||||||
* First version of a cash contract ORM schema that maps all fields of the [Cash] contract state as it stood
|
* First version of a cash contract ORM schema that maps all fields of the [Cash] contract state as it stood
|
||||||
* at the time of writing.
|
* at the time of writing.
|
||||||
*/
|
*/
|
||||||
|
@Suppress("MagicNumber") // SQL column length
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
object CashSchemaV1 : MappedSchema(schemaFamily = CashSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCashState::class.java)) {
|
object CashSchemaV1 : MappedSchema(schemaFamily = CashSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCashState::class.java)) {
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ object CommercialPaperSchema
|
|||||||
* as it stood at the time of writing.
|
* as it stood at the time of writing.
|
||||||
*/
|
*/
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
|
@Suppress("MagicNumber") // SQL column length
|
||||||
object CommercialPaperSchemaV1 : MappedSchema(schemaFamily = CommercialPaperSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCommercialPaperState::class.java)) {
|
object CommercialPaperSchemaV1 : MappedSchema(schemaFamily = CommercialPaperSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCommercialPaperState::class.java)) {
|
||||||
|
|
||||||
override val migrationResource = "commercial-paper.changelog-master"
|
override val migrationResource = "commercial-paper.changelog-master"
|
||||||
|
@ -398,7 +398,9 @@ class WrappedTransientConnectionFailureFlow(private val party: Party) : FlowLogi
|
|||||||
initiateFlow(party).send("hello there")
|
initiateFlow(party).send("hello there")
|
||||||
// checkpoint will restart the flow after the send
|
// checkpoint will restart the flow after the send
|
||||||
retryCount += 1
|
retryCount += 1
|
||||||
throw IllegalStateException("wrapped error message", IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")/*.fillInStackTrace()*/))
|
throw IllegalStateException(
|
||||||
|
"wrapped error message",
|
||||||
|
IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,6 +58,7 @@ import kotlin.test.assertEquals
|
|||||||
import kotlin.test.assertFailsWith
|
import kotlin.test.assertFailsWith
|
||||||
import kotlin.test.assertTrue
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
|
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
|
||||||
class StatemachineErrorHandlingTest : IntegrationTest() {
|
class StatemachineErrorHandlingTest : IntegrationTest() {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
@ -129,11 +130,11 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -209,10 +210,10 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(alice)
|
val output = getBytemanOutput(alice)
|
||||||
@ -230,8 +231,10 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throws an exception when executing [DeduplicationHandler.afterDatabaseTransaction] from inside an [Action.AcknowledgeMessages] action.
|
* Throws an exception when executing [DeduplicationHandler.afterDatabaseTransaction] from
|
||||||
* The exception is thrown every time [DeduplicationHandler.afterDatabaseTransaction] is executed inside of [ActionExecutorImpl.executeAcknowledgeMessages]
|
* inside an [Action.AcknowledgeMessages] action.
|
||||||
|
* The exception is thrown every time [DeduplicationHandler.afterDatabaseTransaction] is executed
|
||||||
|
* inside of [ActionExecutorImpl.executeAcknowledgeMessages]
|
||||||
*
|
*
|
||||||
* The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries.
|
* The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries.
|
||||||
* The flow should complete successfully as the error is swallowed.
|
* The flow should complete successfully as the error is swallowed.
|
||||||
@ -287,10 +290,10 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(alice)
|
val output = getBytemanOutput(alice)
|
||||||
@ -308,7 +311,8 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted state)..
|
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||||
|
* state).
|
||||||
* The exception is thrown 5 times.
|
* The exception is thrown 5 times.
|
||||||
*
|
*
|
||||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
|
||||||
@ -317,8 +321,8 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
|
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
|
||||||
*
|
*
|
||||||
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
|
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
|
||||||
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
|
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to
|
||||||
* that 3 retries are attempted before recovering.
|
* verify that 3 retries are attempted before recovering.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
|
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
|
||||||
@ -371,10 +375,10 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(alice)
|
val output = getBytemanOutput(alice)
|
||||||
@ -392,7 +396,8 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted state)..
|
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
|
||||||
|
* state).
|
||||||
* The exception is thrown 7 times.
|
* The exception is thrown 7 times.
|
||||||
*
|
*
|
||||||
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
|
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
|
||||||
@ -400,8 +405,8 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
|
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
|
||||||
*
|
*
|
||||||
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
|
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
|
||||||
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
|
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to
|
||||||
* that 3 retries are attempted before recovering.
|
* verify that 3 retries are attempted before recovering.
|
||||||
*
|
*
|
||||||
* CORDA-3352 - it is currently hanging after putting the flow in for observation
|
* CORDA-3352 - it is currently hanging after putting the flow in for observation
|
||||||
*/
|
*/
|
||||||
@ -457,11 +462,11 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -489,8 +494,8 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
|
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
|
||||||
*
|
*
|
||||||
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
|
* 2 of the thrown exceptions are absorbed by the if statement in [TransitionExecutorImpl.executeTransition] that aborts the transition
|
||||||
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
|
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to
|
||||||
* that 3 retries are attempted before recovering.
|
* verify that 3 retries are attempted before recovering.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
|
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
|
||||||
@ -560,10 +565,10 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(alice)
|
val output = getBytemanOutput(alice)
|
||||||
@ -649,10 +654,10 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(alice)
|
val output = getBytemanOutput(alice)
|
||||||
@ -745,11 +750,11 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -845,10 +850,10 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(alice)
|
val output = getBytemanOutput(alice)
|
||||||
@ -929,11 +934,11 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1021,11 +1026,11 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1109,12 +1114,12 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
val charlieClient =
|
val charlieClient =
|
||||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(charlie)
|
val output = getBytemanOutput(charlie)
|
||||||
@ -1204,13 +1209,13 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
val charlieClient =
|
val charlieClient =
|
||||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1303,12 +1308,12 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
val charlieClient =
|
val charlieClient =
|
||||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
|
||||||
30.seconds
|
30.seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
val output = getBytemanOutput(charlie)
|
val output = getBytemanOutput(charlie)
|
||||||
@ -1374,17 +1379,17 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
val charlieClient =
|
val charlieClient =
|
||||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(
|
aliceClient.startFlow(
|
||||||
::CashIssueAndPaymentFlow,
|
::CashIssueAndPaymentFlow,
|
||||||
500.DOLLARS,
|
500.DOLLARS,
|
||||||
OpaqueBytes.of(0x01),
|
OpaqueBytes.of(0x01),
|
||||||
charlie.nodeInfo.singleIdentity(),
|
charlie.nodeInfo.singleIdentity(),
|
||||||
false,
|
false,
|
||||||
defaultNotaryIdentity
|
defaultNotaryIdentity
|
||||||
).returnValue.getOrThrow(30.seconds)
|
).returnValue.getOrThrow(30.seconds)
|
||||||
|
|
||||||
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
|
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
|
||||||
@ -1445,17 +1450,17 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
val charlieClient =
|
val charlieClient =
|
||||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(
|
aliceClient.startFlow(
|
||||||
::CashIssueAndPaymentFlow,
|
::CashIssueAndPaymentFlow,
|
||||||
500.DOLLARS,
|
500.DOLLARS,
|
||||||
OpaqueBytes.of(0x01),
|
OpaqueBytes.of(0x01),
|
||||||
charlie.nodeInfo.singleIdentity(),
|
charlie.nodeInfo.singleIdentity(),
|
||||||
false,
|
false,
|
||||||
defaultNotaryIdentity
|
defaultNotaryIdentity
|
||||||
).returnValue.getOrThrow(30.seconds)
|
).returnValue.getOrThrow(30.seconds)
|
||||||
|
|
||||||
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
|
val (discharge, observation) = charlieClient.startFlow(::GetHospitalCountersFlow).returnValue.get()
|
||||||
@ -1532,17 +1537,17 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
val charlieClient =
|
val charlieClient =
|
||||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
aliceClient.startFlow(
|
aliceClient.startFlow(
|
||||||
::CashIssueAndPaymentFlow,
|
::CashIssueAndPaymentFlow,
|
||||||
500.DOLLARS,
|
500.DOLLARS,
|
||||||
OpaqueBytes.of(0x01),
|
OpaqueBytes.of(0x01),
|
||||||
charlie.nodeInfo.singleIdentity(),
|
charlie.nodeInfo.singleIdentity(),
|
||||||
false,
|
false,
|
||||||
defaultNotaryIdentity
|
defaultNotaryIdentity
|
||||||
).returnValue.getOrThrow(30.seconds)
|
).returnValue.getOrThrow(30.seconds)
|
||||||
|
|
||||||
val output = getBytemanOutput(charlie)
|
val output = getBytemanOutput(charlie)
|
||||||
@ -1627,18 +1632,18 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
val charlieClient =
|
val charlieClient =
|
||||||
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
aliceClient.startFlow(
|
aliceClient.startFlow(
|
||||||
::CashIssueAndPaymentFlow,
|
::CashIssueAndPaymentFlow,
|
||||||
500.DOLLARS,
|
500.DOLLARS,
|
||||||
OpaqueBytes.of(0x01),
|
OpaqueBytes.of(0x01),
|
||||||
charlie.nodeInfo.singleIdentity(),
|
charlie.nodeInfo.singleIdentity(),
|
||||||
false,
|
false,
|
||||||
defaultNotaryIdentity
|
defaultNotaryIdentity
|
||||||
).returnValue.getOrThrow(30.seconds)
|
).returnValue.getOrThrow(30.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1708,7 +1713,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
val flow = aliceClient.startTrackedFlow(::SleepFlow)
|
val flow = aliceClient.startTrackedFlow(::SleepFlow)
|
||||||
|
|
||||||
@ -1794,7 +1799,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
val flow = aliceClient.startTrackedFlow(::ThreadSleepFlow)
|
val flow = aliceClient.startTrackedFlow(::ThreadSleepFlow)
|
||||||
|
|
||||||
@ -1894,7 +1899,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
submitBytemanRules(rules)
|
submitBytemanRules(rules)
|
||||||
|
|
||||||
val aliceClient =
|
val aliceClient =
|
||||||
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
|
||||||
|
|
||||||
val flow = aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
|
val flow = aliceClient.startFlow(::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
|
||||||
|
|
||||||
@ -1920,38 +1925,38 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
|
|
||||||
private fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) {
|
private fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) {
|
||||||
driver(
|
driver(
|
||||||
DriverParameters(
|
DriverParameters(
|
||||||
notarySpecs = listOf(notarySpec),
|
notarySpecs = listOf(notarySpec),
|
||||||
startNodesInProcess = false,
|
startNodesInProcess = false,
|
||||||
inMemoryDB = false,
|
inMemoryDB = false,
|
||||||
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
|
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
|
||||||
)
|
)
|
||||||
) {
|
) {
|
||||||
dsl()
|
dsl()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun DriverDSL.createBytemanNode(
|
private fun DriverDSL.createBytemanNode(
|
||||||
providedName: CordaX500Name,
|
providedName: CordaX500Name,
|
||||||
additionalCordapps: Collection<TestCordapp> = emptyList()
|
additionalCordapps: Collection<TestCordapp> = emptyList()
|
||||||
): NodeHandle {
|
): NodeHandle {
|
||||||
return (this as InternalDriverDSL).startNode(
|
return (this as InternalDriverDSL).startNode(
|
||||||
NodeParameters(
|
NodeParameters(
|
||||||
providedName = providedName,
|
providedName = providedName,
|
||||||
rpcUsers = listOf(rpcUser),
|
rpcUsers = listOf(rpcUser),
|
||||||
additionalCordapps = additionalCordapps
|
additionalCordapps = additionalCordapps
|
||||||
),
|
),
|
||||||
bytemanPort = 12000
|
bytemanPort = 12000
|
||||||
).getOrThrow()
|
).getOrThrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
|
private fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
|
||||||
return startNode(
|
return startNode(
|
||||||
NodeParameters(
|
NodeParameters(
|
||||||
providedName = providedName,
|
providedName = providedName,
|
||||||
rpcUsers = listOf(rpcUser),
|
rpcUsers = listOf(rpcUser),
|
||||||
additionalCordapps = additionalCordapps
|
additionalCordapps = additionalCordapps
|
||||||
)
|
)
|
||||||
).getOrThrow()
|
).getOrThrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1962,9 +1967,9 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
|
|||||||
|
|
||||||
private fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
|
private fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
|
||||||
return nodeHandle.baseDirectory
|
return nodeHandle.baseDirectory
|
||||||
.list()
|
.list()
|
||||||
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
|
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
|
||||||
.readAllLines()
|
.readAllLines()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,6 +38,11 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME, DUMMY_NOTARY_NAME)
|
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME, DUMMY_NOTARY_NAME)
|
||||||
|
|
||||||
val log = contextLogger()
|
val log = contextLogger()
|
||||||
|
|
||||||
|
private fun testCordapps() = listOf(
|
||||||
|
findCordapp("com.r3.dbfailure.contracts"),
|
||||||
|
findCordapp("com.r3.dbfailure.workflows"),
|
||||||
|
findCordapp("com.r3.dbfailure.schemas"))
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -68,11 +73,14 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
|
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
aliceNode.rpc.startFlow(::Initiator, "Syntax Error in Custom SQL", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError))
|
aliceNode.rpc.startFlow(
|
||||||
.returnValue.then { testControlFuture.complete(false) }
|
::Initiator,
|
||||||
|
"Syntax Error in Custom SQL",
|
||||||
|
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
|
||||||
|
).returnValue.then { testControlFuture.complete(false) }
|
||||||
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
|
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
|
||||||
|
|
||||||
Assert.assertTrue(foundExpectedException)
|
Assert.assertTrue(foundExpectedException)
|
||||||
@ -87,12 +95,15 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
fun otherExceptionsFromVaultObserverBringFlowDown() {
|
fun otherExceptionsFromVaultObserverBringFlowDown() {
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
assertFailsWith(CordaRuntimeException::class, "Toys out of pram") {
|
assertFailsWith(CordaRuntimeException::class, "Toys out of pram") {
|
||||||
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter))
|
aliceNode.rpc.startFlow(
|
||||||
.returnValue.getOrThrow(30.seconds)
|
::Initiator,
|
||||||
|
"InvalidParameterException",
|
||||||
|
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter)
|
||||||
|
).returnValue.getOrThrow(30.seconds)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -105,7 +116,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
fun otherExceptionsFromVaultObserverCanBeSuppressedInFlow() {
|
fun otherExceptionsFromVaultObserverCanBeSuppressedInFlow() {
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
|
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
|
||||||
@ -137,7 +148,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
|
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
assertFailsWith<TimeoutException> {
|
assertFailsWith<TimeoutException> {
|
||||||
@ -181,7 +192,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
|
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
assertFailsWith<TimeoutException>("ConstraintViolationException") {
|
assertFailsWith<TimeoutException>("ConstraintViolationException") {
|
||||||
@ -220,10 +231,16 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
|
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceValidUpdate, CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
val flowHandle = aliceNode.rpc.startFlow(
|
||||||
|
::Initiator,
|
||||||
|
"EntityManager",
|
||||||
|
CreateStateFlow.errorTargetsToNum(
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
|
||||||
|
CreateStateFlow.ErrorTarget.TxInvalidState,
|
||||||
|
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||||
val flowResult = flowHandle.returnValue
|
val flowResult = flowHandle.returnValue
|
||||||
assertFailsWith<TimeoutException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
|
assertFailsWith<TimeoutException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
|
||||||
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
||||||
@ -252,13 +269,15 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
|
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
val flowHandle = aliceNode.rpc.startFlow(
|
||||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
|
::Initiator, "EntityManager",
|
||||||
CreateStateFlow.ErrorTarget.TxInvalidState,
|
CreateStateFlow.errorTargetsToNum(
|
||||||
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
|
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
|
||||||
|
CreateStateFlow.ErrorTarget.TxInvalidState,
|
||||||
|
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
|
||||||
val flowResult = flowHandle.returnValue
|
val flowResult = flowHandle.returnValue
|
||||||
assertFailsWith<TimeoutException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
|
assertFailsWith<TimeoutException>("ConstraintViolation") { flowResult.getOrThrow(30.seconds) }
|
||||||
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
Assert.assertTrue("Flow has not been to hospital", counter > 0)
|
||||||
@ -280,7 +299,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
|
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||||
@ -303,7 +322,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
|
|||||||
fun constraintViolationInUserCodeInServiceCanBeSuppressedInService() {
|
fun constraintViolationInUserCodeInServiceCanBeSuppressedInService() {
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(
|
||||||
startNodesInProcess = true,
|
startNodesInProcess = true,
|
||||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), findCordapp("com.r3.dbfailure.workflows"), findCordapp("com.r3.dbfailure.schemas")))) {
|
cordappsForAllNodes = testCordapps())) {
|
||||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||||
|
@ -1140,9 +1140,16 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
|
|||||||
@Suppress("DEPRECATION")
|
@Suppress("DEPRECATION")
|
||||||
org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||||
val attributeConverters = listOf(PublicKeyToTextConverter(), AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
val attributeConverters = listOf(PublicKeyToTextConverter(), AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
|
||||||
|
|
||||||
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
|
||||||
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters, customClassLoader, errorHandler = {t ->
|
return CordaPersistence(
|
||||||
FlowStateMachineImpl.currentStateMachine()?.scheduleEvent(Event.Error(t))
|
databaseConfig,
|
||||||
|
schemaService.schemaOptions.keys,
|
||||||
|
jdbcUrl,
|
||||||
|
cacheFactory,
|
||||||
|
attributeConverters, customClassLoader,
|
||||||
|
errorHandler = { t ->
|
||||||
|
FlowStateMachineImpl.currentStateMachine()?.scheduleEvent(Event.Error(t))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ object NodeInfoSchemaV1 : MappedSchema(
|
|||||||
@Column(name = "node_info_id", nullable = false)
|
@Column(name = "node_info_id", nullable = false)
|
||||||
var id: Int,
|
var id: Int,
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // database column width
|
||||||
@Column(name = "node_info_hash", length = 64, nullable = false)
|
@Column(name = "node_info_hash", length = 64, nullable = false)
|
||||||
val hash: String,
|
val hash: String,
|
||||||
|
|
||||||
|
@ -2,12 +2,16 @@ package net.corda.node.services.identity
|
|||||||
|
|
||||||
import net.corda.core.crypto.Crypto
|
import net.corda.core.crypto.Crypto
|
||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.identity.*
|
import net.corda.core.identity.AbstractParty
|
||||||
|
import net.corda.core.identity.AnonymousParty
|
||||||
|
import net.corda.core.identity.CordaX500Name
|
||||||
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.identity.PartyAndCertificate
|
||||||
|
import net.corda.core.identity.x500Matches
|
||||||
import net.corda.core.internal.CertRole
|
import net.corda.core.internal.CertRole
|
||||||
import net.corda.core.internal.NamedCacheFactory
|
import net.corda.core.internal.NamedCacheFactory
|
||||||
import net.corda.core.internal.hash
|
import net.corda.core.internal.hash
|
||||||
import net.corda.core.internal.toSet
|
import net.corda.core.internal.toSet
|
||||||
import net.corda.core.node.services.IdentityService
|
|
||||||
import net.corda.core.node.services.UnknownAnonymousPartyException
|
import net.corda.core.node.services.UnknownAnonymousPartyException
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
|
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
|
||||||
@ -29,13 +33,18 @@ import org.hibernate.annotations.Type
|
|||||||
import org.hibernate.internal.util.collections.ArrayHelper.EMPTY_BYTE_ARRAY
|
import org.hibernate.internal.util.collections.ArrayHelper.EMPTY_BYTE_ARRAY
|
||||||
import java.security.InvalidAlgorithmParameterException
|
import java.security.InvalidAlgorithmParameterException
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.security.cert.*
|
import java.security.cert.CertPathValidatorException
|
||||||
|
import java.security.cert.CertStore
|
||||||
|
import java.security.cert.CertificateExpiredException
|
||||||
|
import java.security.cert.CertificateNotYetValidException
|
||||||
|
import java.security.cert.CollectionCertStoreParameters
|
||||||
|
import java.security.cert.TrustAnchor
|
||||||
|
import java.security.cert.X509Certificate
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
import javax.persistence.Column
|
import javax.persistence.Column
|
||||||
import javax.persistence.Entity
|
import javax.persistence.Entity
|
||||||
import javax.persistence.Id
|
import javax.persistence.Id
|
||||||
import kotlin.IllegalStateException
|
|
||||||
import kotlin.collections.HashSet
|
import kotlin.collections.HashSet
|
||||||
import kotlin.streams.toList
|
import kotlin.streams.toList
|
||||||
|
|
||||||
@ -147,6 +156,7 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
|
|||||||
@javax.persistence.Table(name = NAME_TO_HASH_TABLE_NAME)
|
@javax.persistence.Table(name = NAME_TO_HASH_TABLE_NAME)
|
||||||
class PersistentPartyToPublicKeyHash(
|
class PersistentPartyToPublicKeyHash(
|
||||||
@Id
|
@Id
|
||||||
|
@Suppress("MagicNumber") // database column width
|
||||||
@Column(name = NAME_COLUMN_NAME, length = 128, nullable = false)
|
@Column(name = NAME_COLUMN_NAME, length = 128, nullable = false)
|
||||||
var name: String = "",
|
var name: String = "",
|
||||||
|
|
||||||
|
@ -85,6 +85,7 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Entity
|
@Entity
|
||||||
|
@Suppress("MagicNumber") // database column width
|
||||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids")
|
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids")
|
||||||
class ProcessedMessage(
|
class ProcessedMessage(
|
||||||
@Id
|
@Id
|
||||||
|
@ -29,6 +29,7 @@ class DBCheckpointStorage : CheckpointStorage {
|
|||||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
|
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
|
||||||
class DBCheckpoint(
|
class DBCheckpoint(
|
||||||
@Id
|
@Id
|
||||||
|
@Suppress("MagicNumber") // database column width
|
||||||
@Column(name = "checkpoint_id", length = 64, nullable = false)
|
@Column(name = "checkpoint_id", length = 64, nullable = false)
|
||||||
var checkpointId: String = "",
|
var checkpointId: String = "",
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ import kotlin.streams.toList
|
|||||||
|
|
||||||
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory) : WritableTransactionStorage, SingletonSerializeAsToken() {
|
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory) : WritableTransactionStorage, SingletonSerializeAsToken() {
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // database column width
|
||||||
@Entity
|
@Entity
|
||||||
@Table(name = "${NODE_DATABASE_PREFIX}transactions")
|
@Table(name = "${NODE_DATABASE_PREFIX}transactions")
|
||||||
class DBTransaction(
|
class DBTransaction(
|
||||||
|
@ -120,6 +120,7 @@ class ActionExecutorImpl(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, see comment in the catch clause
|
||||||
@Suspendable
|
@Suspendable
|
||||||
private fun executeAcknowledgeMessages(action: Action.AcknowledgeMessages) {
|
private fun executeAcknowledgeMessages(action: Action.AcknowledgeMessages) {
|
||||||
action.deduplicationHandlers.forEach {
|
action.deduplicationHandlers.forEach {
|
||||||
@ -130,8 +131,8 @@ class ActionExecutorImpl(
|
|||||||
// It is deemed safe for errors to occur here
|
// It is deemed safe for errors to occur here
|
||||||
// Therefore the current transition should not fail if something does go wrong
|
// Therefore the current transition should not fail if something does go wrong
|
||||||
log.info(
|
log.info(
|
||||||
"An error occurred executing a deduplication post-database commit handler. Continuing, as it is safe to do so.",
|
"An error occurred executing a deduplication post-database commit handler. Continuing, as it is safe to do so.",
|
||||||
e
|
e
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -228,17 +229,18 @@ class ActionExecutorImpl(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, see comment in the catch clause
|
||||||
@Suspendable
|
@Suspendable
|
||||||
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
|
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
|
||||||
try {
|
try {
|
||||||
val operationFuture = action.operation.execute(action.deduplicationId)
|
val operationFuture = action.operation.execute(action.deduplicationId)
|
||||||
operationFuture.thenMatch(
|
operationFuture.thenMatch(
|
||||||
success = { result ->
|
success = { result ->
|
||||||
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
|
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
|
||||||
},
|
},
|
||||||
failure = { exception ->
|
failure = { exception ->
|
||||||
fiber.scheduleEvent(Event.Error(exception))
|
fiber.scheduleEvent(Event.Error(exception))
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
// Catch and wrap any unexpected exceptions from the async operation
|
// Catch and wrap any unexpected exceptions from the async operation
|
||||||
|
@ -7,7 +7,11 @@ import net.corda.core.flows.ReceiveTransactionFlow
|
|||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
import net.corda.core.flows.UnexpectedFlowEndException
|
import net.corda.core.flows.UnexpectedFlowEndException
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.*
|
import net.corda.core.internal.DeclaredField
|
||||||
|
import net.corda.core.internal.ThreadBox
|
||||||
|
import net.corda.core.internal.TimedFlow
|
||||||
|
import net.corda.core.internal.VisibleForTesting
|
||||||
|
import net.corda.core.internal.bufferUntilSubscribed
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
@ -165,6 +169,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("ComplexMethod")
|
||||||
private fun admit(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
|
private fun admit(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
|
||||||
val time = clock.instant()
|
val time = clock.instant()
|
||||||
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
|
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
|
||||||
@ -427,8 +432,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
val strippedStacktrace = error.stackTrace
|
val strippedStacktrace = error.stackTrace
|
||||||
.filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
|
.filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
|
||||||
.filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
|
.filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
|
||||||
return strippedStacktrace.isNotEmpty() &&
|
return strippedStacktrace.isNotEmpty()
|
||||||
strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!! )
|
&& strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -436,7 +441,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
* [SQLTransientConnectionException] detection that arise from failing to connect the underlying database/datasource
|
* [SQLTransientConnectionException] detection that arise from failing to connect the underlying database/datasource
|
||||||
*/
|
*/
|
||||||
object TransientConnectionCardiologist : Staff {
|
object TransientConnectionCardiologist : Staff {
|
||||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
override fun consult(
|
||||||
|
flowFiber: FlowFiber,
|
||||||
|
currentState: StateMachineState,
|
||||||
|
newError: Throwable,
|
||||||
|
history: FlowMedicalHistory
|
||||||
|
): Diagnosis {
|
||||||
return if (mentionsTransientConnection(newError)) {
|
return if (mentionsTransientConnection(newError)) {
|
||||||
if (history.notDischargedForTheSameThingMoreThan(2, this, currentState)) {
|
if (history.notDischargedForTheSameThingMoreThan(2, this, currentState)) {
|
||||||
Diagnosis.DISCHARGE
|
Diagnosis.DISCHARGE
|
||||||
@ -458,7 +468,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
* Note that retry decisions from other specialists will not be affected as retries take precedence over hospitalisation.
|
* Note that retry decisions from other specialists will not be affected as retries take precedence over hospitalisation.
|
||||||
*/
|
*/
|
||||||
object DatabaseEndocrinologist : Staff {
|
object DatabaseEndocrinologist : Staff {
|
||||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
override fun consult(
|
||||||
|
flowFiber: FlowFiber,
|
||||||
|
currentState: StateMachineState,
|
||||||
|
newError: Throwable,
|
||||||
|
history: FlowMedicalHistory
|
||||||
|
): Diagnosis {
|
||||||
return if ((newError is SQLException || newError is PersistenceException) && !customConditions.any { it(newError) }) {
|
return if ((newError is SQLException || newError is PersistenceException) && !customConditions.any { it(newError) }) {
|
||||||
Diagnosis.OVERNIGHT_OBSERVATION
|
Diagnosis.OVERNIGHT_OBSERVATION
|
||||||
} else {
|
} else {
|
||||||
@ -475,8 +490,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
*
|
*
|
||||||
* [InterruptedException]s are diagnosed as [Diagnosis.TERMINAL] so they are never retried
|
* [InterruptedException]s are diagnosed as [Diagnosis.TERMINAL] so they are never retried
|
||||||
* (can occur when a flow is killed - `killFlow`).
|
* (can occur when a flow is killed - `killFlow`).
|
||||||
* [AsyncOperationTransitionException]s ares ignored as the error is likely to have originated in user async code rather than inside of a
|
* [AsyncOperationTransitionException]s ares ignored as the error is likely to have originated in user async code rather than inside
|
||||||
* transition.
|
* of a transition.
|
||||||
* All other exceptions are retried a maximum of 3 times before being kept in for observation.
|
* All other exceptions are retried a maximum of 3 times before being kept in for observation.
|
||||||
*/
|
*/
|
||||||
object TransitionErrorGeneralPractitioner : Staff {
|
object TransitionErrorGeneralPractitioner : Staff {
|
||||||
@ -495,21 +510,24 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Diagnosis.NOT_MY_SPECIALTY
|
Diagnosis.NOT_MY_SPECIALTY
|
||||||
}.also { diagnosis ->
|
}.also { logDiagnosis(it, newError, flowFiber, history) }
|
||||||
if (diagnosis != Diagnosis.NOT_MY_SPECIALTY) {
|
}
|
||||||
log.debug {
|
|
||||||
"""
|
private fun logDiagnosis(diagnosis: Diagnosis, newError: Throwable, flowFiber: FlowFiber, history: FlowMedicalHistory) {
|
||||||
|
if (diagnosis != Diagnosis.NOT_MY_SPECIALTY) {
|
||||||
|
log.debug {
|
||||||
|
"""
|
||||||
Flow ${flowFiber.id} given $diagnosis diagnosis due to a transition error
|
Flow ${flowFiber.id} given $diagnosis diagnosis due to a transition error
|
||||||
- Exception: ${newError.message}
|
- Exception: ${newError.message}
|
||||||
- History: $history
|
- History: $history
|
||||||
${(newError as? StateTransitionException)?.transitionAction?.let { "- Action: $it" }}
|
${(newError as? StateTransitionException)?.transitionAction?.let { "- Action: $it" }}
|
||||||
${(newError as? StateTransitionException)?.transitionEvent?.let { "- Event: $it" }}
|
${(newError as? StateTransitionException)?.transitionEvent?.let { "- Event: $it" }}
|
||||||
""".trimIndent()
|
""".trimIndent()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {
|
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {
|
||||||
|
@ -28,6 +28,7 @@ class TransitionExecutorImpl(
|
|||||||
val log = contextLogger()
|
val log = contextLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("NestedBlockDepth", "ReturnCount")
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun executeTransition(
|
override fun executeTransition(
|
||||||
fiber: FlowFiber,
|
fiber: FlowFiber,
|
||||||
|
@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap
|
|||||||
* This interceptor records a trace of all of the flows' states and transitions. If the flow dirties it dumps the trace
|
* This interceptor records a trace of all of the flows' states and transitions. If the flow dirties it dumps the trace
|
||||||
* transition to the logger.
|
* transition to the logger.
|
||||||
*/
|
*/
|
||||||
|
@Suppress("MaxLineLength") // detekt confusing the whole if statement for a line
|
||||||
class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : TransitionExecutor {
|
class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : TransitionExecutor {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
@ -34,7 +35,8 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
|
|||||||
transition: TransitionResult,
|
transition: TransitionResult,
|
||||||
actionExecutor: ActionExecutor
|
actionExecutor: ActionExecutor
|
||||||
): Pair<FlowContinuation, StateMachineState> {
|
): Pair<FlowContinuation, StateMachineState> {
|
||||||
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
|
val (continuation, nextState)
|
||||||
|
= delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
|
||||||
|
|
||||||
if (!previousState.isRemoved) {
|
if (!previousState.isRemoved) {
|
||||||
val transitionRecord =
|
val transitionRecord =
|
||||||
|
@ -2,7 +2,13 @@ package net.corda.node.services.statemachine.interceptors
|
|||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
import net.corda.node.services.statemachine.*
|
import net.corda.node.services.statemachine.ActionExecutor
|
||||||
|
import net.corda.node.services.statemachine.ErrorState
|
||||||
|
import net.corda.node.services.statemachine.Event
|
||||||
|
import net.corda.node.services.statemachine.FlowFiber
|
||||||
|
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||||
|
import net.corda.node.services.statemachine.StateMachineState
|
||||||
|
import net.corda.node.services.statemachine.TransitionExecutor
|
||||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||||
import net.corda.node.services.statemachine.transitions.TransitionResult
|
import net.corda.node.services.statemachine.transitions.TransitionResult
|
||||||
|
|
||||||
|
@ -72,6 +72,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
|||||||
var requestDate: Instant
|
var requestDate: Instant
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // database column length
|
||||||
@Entity
|
@Entity
|
||||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_txs")
|
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_txs")
|
||||||
class CommittedTransaction(
|
class CommittedTransaction(
|
||||||
|
@ -12,6 +12,7 @@ import javax.persistence.Entity
|
|||||||
import javax.persistence.Id
|
import javax.persistence.Id
|
||||||
import javax.persistence.Table
|
import javax.persistence.Table
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // database column length
|
||||||
class ContractUpgradeServiceImpl(cacheFactory: NamedCacheFactory) : ContractUpgradeService, SingletonSerializeAsToken() {
|
class ContractUpgradeServiceImpl(cacheFactory: NamedCacheFactory) : ContractUpgradeService, SingletonSerializeAsToken() {
|
||||||
|
|
||||||
@Entity
|
@Entity
|
||||||
|
@ -394,7 +394,9 @@ class NodeVaultService(
|
|||||||
try {
|
try {
|
||||||
updatesPublisher.onNext(vaultUpdate)
|
updatesPublisher.onNext(vaultUpdate)
|
||||||
} catch (e: OnErrorNotImplementedException) {
|
} catch (e: OnErrorNotImplementedException) {
|
||||||
log.warn("Caught an Rx.OnErrorNotImplementedException - caused by an exception in an RX observer that was unhandled - the observer has been unsubscribed! The underlying exception will be rethrown.", e)
|
log.warn("Caught an Rx.OnErrorNotImplementedException " +
|
||||||
|
"- caused by an exception in an RX observer that was unhandled " +
|
||||||
|
"- the observer has been unsubscribed! The underlying exception will be rethrown.", e)
|
||||||
// if the observer code threw, unwrap their exception from the RX wrapper
|
// if the observer code threw, unwrap their exception from the RX wrapper
|
||||||
throw e.cause ?: e
|
throw e.cause ?: e
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ object VaultSchema
|
|||||||
/**
|
/**
|
||||||
* First version of the Vault ORM schema
|
* First version of the Vault ORM schema
|
||||||
*/
|
*/
|
||||||
|
@Suppress("MagicNumber") // database column length
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
object VaultSchemaV1 : MappedSchema(
|
object VaultSchemaV1 : MappedSchema(
|
||||||
schemaFamily = VaultSchema.javaClass,
|
schemaFamily = VaultSchema.javaClass,
|
||||||
|
@ -127,6 +127,7 @@ class BFTSmartNotaryService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // database column length
|
||||||
@Entity
|
@Entity
|
||||||
@Table(name = "${NODE_DATABASE_PREFIX}bft_committed_txs")
|
@Table(name = "${NODE_DATABASE_PREFIX}bft_committed_txs")
|
||||||
class CommittedTransaction(
|
class CommittedTransaction(
|
||||||
|
@ -104,6 +104,7 @@ class RaftUniquenessProvider(
|
|||||||
var index: Long = 0
|
var index: Long = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // database column length
|
||||||
@Entity
|
@Entity
|
||||||
@Table(name = "${NODE_DATABASE_PREFIX}raft_committed_txs")
|
@Table(name = "${NODE_DATABASE_PREFIX}raft_committed_txs")
|
||||||
class CommittedTransaction(
|
class CommittedTransaction(
|
||||||
|
@ -19,7 +19,12 @@ class DbFailureContract : Contract {
|
|||||||
val ID = "com.r3.dbfailure.contracts.DbFailureContract"
|
val ID = "com.r3.dbfailure.contracts.DbFailureContract"
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestState(override val linearId: UniqueIdentifier, val particpant: Party, val randomValue: String?, val errorTarget: Int = 0) : LinearState, QueryableState {
|
class TestState(
|
||||||
|
override val linearId: UniqueIdentifier,
|
||||||
|
val particpant: Party,
|
||||||
|
val randomValue: String?,
|
||||||
|
val errorTarget: Int = 0
|
||||||
|
) : LinearState, QueryableState {
|
||||||
|
|
||||||
override val participants: List<AbstractParty> = listOf(particpant)
|
override val participants: List<AbstractParty> = listOf(particpant)
|
||||||
|
|
||||||
|
@ -9,8 +9,15 @@ import net.corda.core.flows.InitiatingFlow
|
|||||||
import net.corda.core.flows.StartableByRPC
|
import net.corda.core.flows.StartableByRPC
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
|
|
||||||
|
// There is a bit of number fiddling in this class to encode/decode the error target instructions
|
||||||
|
@Suppress("MagicNumber")
|
||||||
object CreateStateFlow {
|
object CreateStateFlow {
|
||||||
|
|
||||||
|
// Encoding of error targets
|
||||||
|
// 1s are errors actions to be taken in the vault listener in the service
|
||||||
|
// 10s are errors caused in the flow
|
||||||
|
// 100s control exception handling in the flow
|
||||||
|
// 1000s control exception handlling in the service/vault listener
|
||||||
enum class ErrorTarget(val targetNumber: Int) {
|
enum class ErrorTarget(val targetNumber: Int) {
|
||||||
NoError(0),
|
NoError(0),
|
||||||
ServiceSqlSyntaxError(1),
|
ServiceSqlSyntaxError(1),
|
||||||
@ -56,7 +63,11 @@ object CreateStateFlow {
|
|||||||
val notary = serviceHub.networkMapCache.notaryIdentities[0]
|
val notary = serviceHub.networkMapCache.notaryIdentities[0]
|
||||||
val txTarget = getTxTarget(errorTarget)
|
val txTarget = getTxTarget(errorTarget)
|
||||||
logger.info("Test flow: The tx error target is $txTarget")
|
logger.info("Test flow: The tx error target is $txTarget")
|
||||||
val state = DbFailureContract.TestState(UniqueIdentifier(), ourIdentity, if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue, errorTarget)
|
val state = DbFailureContract.TestState(
|
||||||
|
UniqueIdentifier(),
|
||||||
|
ourIdentity,
|
||||||
|
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
|
||||||
|
errorTarget)
|
||||||
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
|
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
|
||||||
|
|
||||||
logger.info("Test flow: tx builder")
|
logger.info("Test flow: tx builder")
|
||||||
@ -69,6 +80,7 @@ object CreateStateFlow {
|
|||||||
|
|
||||||
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
||||||
|
|
||||||
|
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions according to config
|
||||||
try {
|
try {
|
||||||
logger.info("Test flow: recording transaction")
|
logger.info("Test flow: recording transaction")
|
||||||
serviceHub.recordTransactions(signedTx)
|
serviceHub.recordTransactions(signedTx)
|
||||||
|
@ -18,6 +18,7 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
services.vaultService.rawUpdates.subscribe { (_, produced) ->
|
services.vaultService.rawUpdates.subscribe { (_, produced) ->
|
||||||
produced.forEach {
|
produced.forEach {
|
||||||
val contractState = it.state.data as? DbFailureContract.TestState
|
val contractState = it.state.data as? DbFailureContract.TestState
|
||||||
|
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
|
||||||
try {
|
try {
|
||||||
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
|
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
|
||||||
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
|
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
|
||||||
@ -72,7 +73,8 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||||
)
|
)
|
||||||
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
|
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
|
||||||
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}")
|
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
|
||||||
|
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}")
|
||||||
}
|
}
|
||||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
|
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
|
||||||
log.info("Throw InvalidParameterException")
|
log.info("Throw InvalidParameterException")
|
||||||
@ -80,7 +82,8 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (t: Throwable) {
|
} catch (t: Throwable) {
|
||||||
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget) == CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
|
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
|
||||||
|
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
|
||||||
log.warn("Service not letting errors escape", t)
|
log.warn("Service not letting errors escape", t)
|
||||||
} else {
|
} else {
|
||||||
throw t
|
throw t
|
||||||
|
@ -135,6 +135,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // initialising to max value
|
||||||
private fun makeNetworkParametersCopier(config: NodeConfigWrapper): NetworkParametersCopier {
|
private fun makeNetworkParametersCopier(config: NodeConfigWrapper): NetworkParametersCopier {
|
||||||
val identity = getNotaryIdentity(config)
|
val identity = getNotaryIdentity(config)
|
||||||
val parametersCopier = NetworkParametersCopier(NetworkParameters(
|
val parametersCopier = NetworkParametersCopier(NetworkParameters(
|
||||||
|
@ -241,6 +241,7 @@ class NodeTabView : Fragment() {
|
|||||||
CityDatabase.cityMap.values.map { it.countryCode }.toSet().map { it to Image(resources["/net/corda/demobench/flags/$it.png"]) }.toMap()
|
CityDatabase.cityMap.values.map { it.countryCode }.toSet().map { it to Image(resources["/net/corda/demobench/flags/$it.png"]) }.toMap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // demobench UI magic
|
||||||
private fun Pane.nearestCityField(): ComboBox<WorldMapLocation> {
|
private fun Pane.nearestCityField(): ComboBox<WorldMapLocation> {
|
||||||
return combobox(model.nearestCity, CityDatabase.cityMap.values.toList().sortedBy { it.description }) {
|
return combobox(model.nearestCity, CityDatabase.cityMap.values.toList().sortedBy { it.description }) {
|
||||||
minWidth = textWidth
|
minWidth = textWidth
|
||||||
|
@ -70,6 +70,7 @@ fun main(args: Array<String>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // test constants
|
||||||
private fun runLoadTest(loadTestConfiguration: LoadTestConfiguration) {
|
private fun runLoadTest(loadTestConfiguration: LoadTestConfiguration) {
|
||||||
runLoadTests(loadTestConfiguration, listOf(
|
runLoadTests(loadTestConfiguration, listOf(
|
||||||
selfIssueTest to LoadTest.RunParameters(
|
selfIssueTest to LoadTest.RunParameters(
|
||||||
@ -131,6 +132,7 @@ private fun runLoadTest(loadTestConfiguration: LoadTestConfiguration) {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // test constants
|
||||||
private fun runStabilityTest(loadTestConfiguration: LoadTestConfiguration) {
|
private fun runStabilityTest(loadTestConfiguration: LoadTestConfiguration) {
|
||||||
runLoadTests(loadTestConfiguration, listOf(
|
runLoadTests(loadTestConfiguration, listOf(
|
||||||
// Self issue cash. This is a pre test step to make sure vault have enough cash to work with.
|
// Self issue cash. This is a pre test step to make sure vault have enough cash to work with.
|
||||||
|
@ -38,6 +38,7 @@ interface Volume {
|
|||||||
nodeInfoFile.readBytes().deserialize<SignedNodeInfo>().verified().let { NotaryInfo(it.legalIdentities.first(), validating) }
|
nodeInfoFile.readBytes().deserialize<SignedNodeInfo>().verified().let { NotaryInfo(it.legalIdentities.first(), validating) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Suppress("MagicNumber") // default config constants
|
||||||
return notaryInfos.let {
|
return notaryInfos.let {
|
||||||
NetworkParameters(
|
NetworkParameters(
|
||||||
minimumPlatformVersion = 1,
|
minimumPlatformVersion = 1,
|
||||||
|
Reference in New Issue
Block a user