CID-1154: reliable finality merge to OS (#5658)

CID-1154: reliable finality merge to OS (#5658)
This commit is contained in:
Jonathan Locke 2019-11-05 10:48:00 +00:00 committed by GitHub
commit c193aa46f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 3371 additions and 186 deletions

View File

@ -21,6 +21,10 @@ class NotaryException(
/** Specifies the cause for notarisation request failure. */
@CordaSerializable
sealed class NotaryError {
companion object {
const val NUM_STATES = 5
}
/** Occurs when one or more input states have already been consumed by another transaction. */
data class Conflict(
/** Id of the transaction that was attempted to be notarised. */
@ -28,8 +32,9 @@ sealed class NotaryError {
/** Specifies which states have already been consumed in another transaction. */
val consumedStates: Map<StateRef, StateConsumptionDetails>
) : NotaryError() {
override fun toString() = "One or more input states or referenced states have already been used as input states in other transactions. Conflicting state count: ${consumedStates.size}, consumption details:\n" +
"${consumedStates.asSequence().joinToString(",\n", limit = 5) { it.key.toString() + " -> " + it.value }}.\n" +
override fun toString() = "One or more input states or referenced states have already been used as input states in other transactions. " +
"Conflicting state count: ${consumedStates.size}, consumption details:\n" +
"${consumedStates.asSequence().joinToString(",\n", limit = NUM_STATES) { it.key.toString() + " -> " + it.value }}.\n" +
"To find out if any of the conflicting transactions have been generated by this node you can use the hashLookup Corda shell command."
}

View File

@ -92,6 +92,7 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
/**
* VaultQueryCriteria: provides query by attributes defined in [VaultSchema.VaultStates]
*/
@Suppress("MagicNumber") // need to list deprecation versions explicitly
data class VaultQueryCriteria(
override val status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED,
override val contractStateTypes: Set<Class<out ContractState>>? = null,
@ -264,6 +265,7 @@ sealed class QueryCriteria : GenericQueryCriteria<QueryCriteria, IQueryCriteriaP
/**
* LinearStateQueryCriteria: provides query by attributes defined in [VaultSchema.VaultLinearState]
*/
@Suppress("MagicNumber") // need to list deprecation versions explicitly
data class LinearStateQueryCriteria(
override val participants: List<AbstractParty>? = null,
val uuid: List<UUID>? = null,
@ -545,6 +547,7 @@ sealed class AttachmentQueryCriteria : GenericQueryCriteria<AttachmentQueryCrite
/**
* AttachmentsQueryCriteria:
*/
@Suppress("MagicNumber") // need to list deprecation versions explicitly
data class AttachmentsQueryCriteria(val uploaderCondition: ColumnPredicate<String>? = null,
val filenameCondition: ColumnPredicate<String>? = null,
val uploadDateCondition: ColumnPredicate<Instant>? = null,

View File

@ -90,7 +90,9 @@ class PersistentState(@EmbeddedId override var stateRef: PersistentStateRef? = n
@KeepForDJVM
@Embeddable
@Immutable
data class PersistentStateRef(
@Suppress("MagicNumber") // column width
@Column(name = "transaction_id", length = 64, nullable = false)
var txId: String,

View File

@ -143,8 +143,6 @@
<ID>ComplexMethod:CustomSerializerRegistry.kt$CachingCustomSerializerRegistry$private fun doFindCustomSerializer(clazz: Class&lt;*&gt;, declaredType: Type): AMQPSerializer&lt;Any&gt;?</ID>
<ID>ComplexMethod:DeserializationInput.kt$DeserializationInput$fun readObject(obj: Any, schemas: SerializationSchemas, type: Type, context: SerializationContext): Any</ID>
<ID>ComplexMethod:DriverDSLImpl.kt$DriverDSLImpl$override fun start()</ID>
<ID>ComplexMethod:DriverDSLImpl.kt$DriverDSLImpl$private fun startNodeInternal(config: NodeConfig, webAddress: NetworkHostAndPort, localNetworkMap: LocalNetworkMap?, parameters: NodeParameters): CordaFuture&lt;NodeHandle&gt;</ID>
<ID>ComplexMethod:DriverDSLImpl.kt$DriverDSLImpl$private fun startRegisteredNode(name: CordaX500Name, localNetworkMap: LocalNetworkMap?, parameters: NodeParameters, p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()): CordaFuture&lt;NodeHandle&gt;</ID>
<ID>ComplexMethod:Expect.kt$ fun &lt;S, E : Any&gt; S.genericExpectEvents( isStrict: Boolean = true, stream: S.((E) -&gt; Unit) -&gt; Unit, expectCompose: () -&gt; ExpectCompose&lt;E&gt; )</ID>
<ID>ComplexMethod:FinalityFlow.kt$FinalityFlow$@Suspendable @Throws(NotaryException::class) override fun call(): SignedTransaction</ID>
<ID>ComplexMethod:FlowMonitor.kt$FlowMonitor$private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest&lt;*&gt;, flow: FlowStateMachineImpl&lt;*&gt;, now: Instant): String</ID>
@ -690,7 +688,7 @@
<ID>LongParameterList:DriverDSL.kt$DriverDSL$( defaultParameters: NodeParameters = NodeParameters(), providedName: CordaX500Name? = defaultParameters.providedName, rpcUsers: List&lt;User&gt; = defaultParameters.rpcUsers, verifierType: VerifierType = defaultParameters.verifierType, customOverrides: Map&lt;String, Any?&gt; = defaultParameters.customOverrides, startInSameProcess: Boolean? = defaultParameters.startInSameProcess, maximumHeapSize: String = defaultParameters.maximumHeapSize )</ID>
<ID>LongParameterList:DriverDSL.kt$DriverDSL$( defaultParameters: NodeParameters = NodeParameters(), providedName: CordaX500Name? = defaultParameters.providedName, rpcUsers: List&lt;User&gt; = defaultParameters.rpcUsers, verifierType: VerifierType = defaultParameters.verifierType, customOverrides: Map&lt;String, Any?&gt; = defaultParameters.customOverrides, startInSameProcess: Boolean? = defaultParameters.startInSameProcess, maximumHeapSize: String = defaultParameters.maximumHeapSize, logLevelOverride: String? = defaultParameters.logLevelOverride )</ID>
<ID>LongParameterList:DriverDSLImpl.kt$( isDebug: Boolean = DriverParameters().isDebug, driverDirectory: Path = DriverParameters().driverDirectory, portAllocation: PortAllocation = DriverParameters().portAllocation, debugPortAllocation: PortAllocation = DriverParameters().debugPortAllocation, systemProperties: Map&lt;String, String&gt; = DriverParameters().systemProperties, useTestClock: Boolean = DriverParameters().useTestClock, startNodesInProcess: Boolean = DriverParameters().startNodesInProcess, extraCordappPackagesToScan: List&lt;String&gt; = @Suppress("DEPRECATION") DriverParameters().extraCordappPackagesToScan, waitForAllNodesToFinish: Boolean = DriverParameters().waitForAllNodesToFinish, notarySpecs: List&lt;NotarySpec&gt; = DriverParameters().notarySpecs, jmxPolicy: JmxPolicy = DriverParameters().jmxPolicy, networkParameters: NetworkParameters = DriverParameters().networkParameters, compatibilityZone: CompatibilityZoneParams? = null, notaryCustomOverrides: Map&lt;String, Any?&gt; = DriverParameters().notaryCustomOverrides, inMemoryDB: Boolean = DriverParameters().inMemoryDB, cordappsForAllNodes: Collection&lt;TestCordappInternal&gt;? = null, dsl: DriverDSLImpl.() -&gt; A )</ID>
<ID>LongParameterList:DriverDSLImpl.kt$DriverDSLImpl.Companion$( config: NodeConfig, quasarJarPath: String, debugPort: Int?, overriddenSystemProperties: Map&lt;String, String&gt;, maximumHeapSize: String, logLevelOverride: String?, vararg extraCmdLineFlag: String )</ID>
<ID>LongParameterList:DriverDSLImpl.kt$DriverDSLImpl.Companion$( config: NodeConfig, quasarJarPath: String, debugPort: Int?, bytemanJarPath: String?, bytemanPort: Int?, overriddenSystemProperties: Map&lt;String, String&gt;, maximumHeapSize: String, logLevelOverride: String?, vararg extraCmdLineFlag: String )</ID>
<ID>LongParameterList:DummyFungibleContract.kt$DummyFungibleContract$(inputs: List&lt;State&gt;, outputs: List&lt;State&gt;, tx: LedgerTransaction, issueCommand: CommandWithParties&lt;Commands.Issue&gt;, currency: Currency, issuer: PartyAndReference)</ID>
<ID>LongParameterList:IRS.kt$FloatingRatePaymentEvent$(date: LocalDate = this.date, accrualStartDate: LocalDate = this.accrualStartDate, accrualEndDate: LocalDate = this.accrualEndDate, dayCountBasisDay: DayCountBasisDay = this.dayCountBasisDay, dayCountBasisYear: DayCountBasisYear = this.dayCountBasisYear, fixingDate: LocalDate = this.fixingDate, notional: Amount&lt;Currency&gt; = this.notional, rate: Rate = this.rate)</ID>
<ID>LongParameterList:IRS.kt$InterestRateSwap$(floatingLeg: FloatingLeg, fixedLeg: FixedLeg, calculation: Calculation, common: Common, oracle: Party, notary: Party)</ID>
@ -2080,9 +2078,9 @@
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl$private</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl$val flowOverrideConfig = FlowOverrideConfig(parameters.flowOverrides.map { FlowOverride(it.key.canonicalName, it.value.canonicalName) })</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl$val jdbcUrl = "jdbc:h2:mem:persistence${inMemoryCounter.getAndIncrement()};DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100"</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl$val process = startOutOfProcessNode(config, quasarJarPath, debugPort, systemProperties, parameters.maximumHeapSize, parameters.logLevelOverride)</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl.Companion$if (bytemanAgent != null &amp;&amp; debugPort != null) listOf("-Dorg.jboss.byteman.verbose=true", "-Dorg.jboss.byteman.debug=true") else emptyList()</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl.Companion$private operator fun Config.plus(property: Pair&lt;String, Any&gt;)</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl.Companion${ log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled")) // Write node.conf writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly()) val systemProperties = mutableMapOf( "name" to config.corda.myLegalName, "visualvm.display.name" to "corda-${config.corda.myLegalName}" ) debugPort?.let { systemProperties += "log4j2.level" to "debug" systemProperties += "log4j2.debug" to "true" } systemProperties += inheritFromParentProcess() systemProperties += overriddenSystemProperties // See experimental/quasar-hook/README.md for how to generate. val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" + "com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" + "com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" + "io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" + "org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" + "org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" + "org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**;org.jolokia**;" + "com.lmax**;picocli**;liquibase**;com.github.benmanes**;org.json**;org.postgresql**;nonapi.io.github.classgraph**;)" val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } + "-javaagent:$quasarJarPath=$excludePattern" val loggingLevel = when { logLevelOverride != null -&gt; logLevelOverride debugPort == null -&gt; "INFO" else -&gt; "DEBUG" } val arguments = mutableListOf( "--base-directory=${config.corda.baseDirectory}", "--logging-level=$loggingLevel", "--no-local-shell").also { it += extraCmdLineFlag }.toList() // The following dependencies are excluded from the classpath of the created JVM, so that the environment resembles a real one as close as possible. // These are either classes that will be added as attachments to the node (i.e. samples, finance, opengamma etc.) or irrelevant testing libraries (test, corda-mock etc.). // TODO: There is pending work to fix this issue without custom blacklisting. See: https://r3-cev.atlassian.net/browse/CORDA-2164. val exclude = listOf("samples", "finance", "integrationTest", "test", "corda-mock", "com.opengamma.strata") val cp = ProcessUtilities.defaultClassPath.filterNot { cpEntry -&gt; exclude.any { token -&gt; cpEntry.contains("${File.separatorChar}$token") } || cpEntry.endsWith("-tests.jar") } return ProcessUtilities.startJavaProcess( className = "net.corda.node.Corda", // cannot directly get class for this, so just use string arguments = arguments, jdwpPort = debugPort, extraJvmArguments = extraJvmArguments, workingDirectory = config.corda.baseDirectory, maximumHeapSize = maximumHeapSize, classPath = cp ) }</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$DriverDSLImpl.Companion${ log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, " + "debug port is " + (debugPort ?: "not enabled") + ", " + "byteMan: " + if (bytemanJarPath == null) "not in classpath" else "port is " + (bytemanPort ?: "not enabled")) // Write node.conf writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly()) val systemProperties = mutableMapOf( "name" to config.corda.myLegalName, "visualvm.display.name" to "corda-${config.corda.myLegalName}" ) debugPort?.let { systemProperties += "log4j2.level" to "debug" systemProperties += "log4j2.debug" to "true" } systemProperties += inheritFromParentProcess() systemProperties += overriddenSystemProperties // See experimental/quasar-hook/README.md for how to generate. val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" + "com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" + "com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" + "io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" + "org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" + "org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" + "org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**;org.jolokia**;" + "com.lmax**;picocli**;liquibase**;com.github.benmanes**;org.json**;org.postgresql**;nonapi.io.github.classgraph**;)" val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } + "-javaagent:$quasarJarPath=$excludePattern" val loggingLevel = when { logLevelOverride != null -&gt; logLevelOverride debugPort == null -&gt; "INFO" else -&gt; "DEBUG" } val arguments = mutableListOf( "--base-directory=${config.corda.baseDirectory}", "--logging-level=$loggingLevel", "--no-local-shell").also { it += extraCmdLineFlag }.toList() val bytemanJvmArgs = { val bytemanAgent = bytemanJarPath?.let { bytemanPort?.let { "-javaagent:$bytemanJarPath=port:$bytemanPort,listener:true" } } listOfNotNull(bytemanAgent) + if (bytemanAgent != null &amp;&amp; debugPort != null) listOf("-Dorg.jboss.byteman.verbose=true", "-Dorg.jboss.byteman.debug=true") else emptyList() }.invoke() // The following dependencies are excluded from the classpath of the created JVM, so that the environment resembles a real one as close as possible. // These are either classes that will be added as attachments to the node (i.e. samples, finance, opengamma etc.) or irrelevant testing libraries (test, corda-mock etc.). // TODO: There is pending work to fix this issue without custom blacklisting. See: https://r3-cev.atlassian.net/browse/CORDA-2164. val exclude = listOf("samples", "finance", "integrationTest", "test", "corda-mock", "com.opengamma.strata") val cp = ProcessUtilities.defaultClassPath.filterNot { cpEntry -&gt; exclude.any { token -&gt; cpEntry.contains("${File.separatorChar}$token") } || cpEntry.endsWith("-tests.jar") } return ProcessUtilities.startJavaProcess( className = "net.corda.node.Corda", // cannot directly get class for this, so just use string arguments = arguments, jdwpPort = debugPort, extraJvmArguments = extraJvmArguments + bytemanJvmArgs, workingDirectory = config.corda.baseDirectory, maximumHeapSize = maximumHeapSize, classPath = cp ) }</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$InternalDriverDSL$ fun &lt;A&gt; pollUntilNonNull(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -&gt; A?): CordaFuture&lt;A&gt;</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$InternalDriverDSL$ fun pollUntilTrue(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -&gt; Boolean): CordaFuture&lt;Unit&gt;</ID>
<ID>MaxLineLength:DriverDSLImpl.kt$fun DriverDSL.startNode(providedName: CordaX500Name, devMode: Boolean, parameters: NodeParameters = NodeParameters()): CordaFuture&lt;NodeHandle&gt;</ID>
@ -3859,7 +3857,7 @@
<ID>SpreadOperator:DemoBench.kt$DemoBench.Companion$(DemoBench::class.java, *args)</ID>
<ID>SpreadOperator:DevCertificatesTest.kt$DevCertificatesTest$(*oldX509Certificates)</ID>
<ID>SpreadOperator:DockerInstantiator.kt$DockerInstantiator$(*it.toTypedArray())</ID>
<ID>SpreadOperator:DriverDSLImpl.kt$DriverDSLImpl$( config, quasarJarPath, debugPort, systemProperties, "512m", null, *extraCmdLineFlag )</ID>
<ID>SpreadOperator:DriverDSLImpl.kt$DriverDSLImpl$( config, quasarJarPath, debugPort, bytemanJarPath, null, systemProperties, "512m", null, *extraCmdLineFlag )</ID>
<ID>SpreadOperator:DummyContract.kt$DummyContract.Companion$( /* INPUTS */ *priors.toTypedArray(), /* COMMAND */ Command(cmd, priorState.owner.owningKey), /* OUTPUT */ StateAndContract(state, PROGRAM_ID) )</ID>
<ID>SpreadOperator:DummyContract.kt$DummyContract.Companion$(*items)</ID>
<ID>SpreadOperator:DummyContractV2.kt$DummyContractV2.Companion$( /* INPUTS */ *priors.toTypedArray(), /* COMMAND */ Command(cmd, priorState.owners.map { it.owningKey }), /* OUTPUT */ StateAndContract(state, DummyContractV2.PROGRAM_ID) )</ID>

View File

@ -266,6 +266,7 @@ class InitiatorFlow(val arg1: Boolean, val arg2: Int, private val counterparty:
val ourOutputState: DummyState = DummyState()
// DOCEND 22
// Or as copies of other states with some properties changed.
@Suppress("MagicNumber") // literally a magic number
// DOCSTART 23
val ourOtherOutputState: DummyState = ourOutputState.copy(magicNumber = 77)
// DOCEND 23

View File

@ -51,7 +51,13 @@ Specifically, there are two main ways a flow is hospitalized:
* **Database constraint violation** (``ConstraintViolationException``):
This scenario may occur due to natural contention between racing flows as Corda delegates handling using the database's optimistic concurrency control.
As the likelihood of re-occurrence should be low, the flow will actually error and fail if it experiences this at the same point more than 3 times. No intervention required.
If this exception occurs, the flow will retry. After retrying a number of times, the errored flow is kept in for observation.
* ``SQLTransientConnectionException``:
Database connection pooling errors are dealt with. If this exception occurs, the flow will retry. After retrying a number of times, the errored flow is kept in for observation.
* All other instances of ``SQLException``:
Any ``SQLException`` that is thrown and not handled by any of the scenarios detailed above, will be kept in for observation after their first failure.
* **Finality Flow handling** - Corda 3.x (old style) ``FinalityFlow`` and Corda 4.x ``ReceiveFinalityFlow`` handling:
If on the receive side of the finality flow, any error will result in the flow being kept in for observation to allow the cause of the
@ -64,7 +70,8 @@ Specifically, there are two main ways a flow is hospitalized:
The time is hard to document as the notary members, if actually alive, will inform the requester of the ETA of a response.
This can occur an infinite number of times. i.e. we never give up notarising. No intervention required.
* ``SQLTransientConnectionException``:
Database connection pooling errors are dealt with. If this exception occurs, the flow will retry. After retrying a number of times, the errored flow is kept in for observation.
* **Internal Corda errors**:
Flows that experience errors from inside the Corda statemachine, that are not handled by any of the scenarios details above, will be retried a number of times
and then kept in for observation if the error continues.
.. note:: Flows that are kept in for observation are retried upon node restart.

View File

@ -18,6 +18,7 @@ object CashSchema
* First version of a cash contract ORM schema that maps all fields of the [Cash] contract state as it stood
* at the time of writing.
*/
@Suppress("MagicNumber") // SQL column length
@CordaSerializable
object CashSchemaV1 : MappedSchema(schemaFamily = CashSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCashState::class.java)) {

View File

@ -22,6 +22,7 @@ object CommercialPaperSchema
* as it stood at the time of writing.
*/
@CordaSerializable
@Suppress("MagicNumber") // SQL column length
object CommercialPaperSchemaV1 : MappedSchema(schemaFamily = CommercialPaperSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCommercialPaperState::class.java)) {
override val migrationResource = "commercial-paper.changelog-master"

View File

@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicInteger
import javax.persistence.AttributeConverter
import javax.persistence.PersistenceException
import javax.sql.DataSource
/**
@ -98,7 +99,8 @@ class CordaPersistence(
cacheFactory: NamedCacheFactory,
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
customClassLoader: ClassLoader? = null,
val closeConnection: Boolean = true
val closeConnection: Boolean = true,
val errorHandler: (t: Throwable) -> Unit = {}
) : Closeable {
companion object {
private val log = contextLogger()
@ -189,10 +191,18 @@ class CordaPersistence(
}
fun createSession(): Connection {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
_contextDatabase.set(this)
currentDBSession().flush()
return contextTransaction.connection
try {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
_contextDatabase.set(this)
currentDBSession().flush()
return contextTransaction.connection
} catch (sqlException: SQLException) {
errorHandler(sqlException)
throw sqlException
} catch (persistenceException: PersistenceException) {
errorHandler(persistenceException)
throw persistenceException
}
}
/**
@ -220,10 +230,18 @@ class CordaPersistence(
recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T {
_contextDatabase.set(this)
val outer = contextTransactionOrNull
return if (outer != null) {
outer.statement()
} else {
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
try {
return if (outer != null) {
outer.statement()
} else {
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
}
} catch (sqlException: SQLException) {
errorHandler(sqlException)
throw sqlException
} catch (persistenceException: PersistenceException) {
errorHandler(persistenceException)
throw persistenceException
}
}

View File

@ -228,6 +228,12 @@ dependencies {
// Required by JVMAgentUtil (x-compatible java 8 & 11 agent lookup mechanism)
compile files("${System.properties['java.home']}/../lib/tools.jar")
// Byteman for runtime (termination) rules injection on the running node
// Submission tool allowing to install rules on running nodes
integrationTestCompile "org.jboss.byteman:byteman-submit:4.0.3"
// The actual Byteman agent which should only be in the classpath of the out of process nodes
integrationTestCompile "org.jboss.byteman:byteman:4.0.3"
testCompile(project(':test-cli'))
testCompile(project(':test-utils'))
@ -237,6 +243,8 @@ dependencies {
slowIntegrationTestCompile configurations.testCompile
slowIntegrationTestRuntime configurations.runtime
slowIntegrationTestRuntime configurations.testRuntime
testCompile project(':testing:cordapps:dbfailure:dbfworkflows')
}
tasks.withType(JavaCompile) {

View File

@ -17,6 +17,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.FlowTimeoutException
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
@ -25,6 +26,7 @@ import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.hibernate.exception.ConstraintViolationException
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.lang.management.ManagementFactory
@ -46,6 +48,12 @@ class FlowRetryTest {
TransientConnectionFailureFlow.retryCount = -1
WrappedTransientConnectionFailureFlow.retryCount = -1
GeneralExternalFailureFlow.retryCount = -1
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { true }
}
@After
fun cleanUp() {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
}
@Test
@ -390,7 +398,9 @@ class WrappedTransientConnectionFailureFlow(private val party: Party) : FlowLogi
initiateFlow(party).send("hello there")
// checkpoint will restart the flow after the send
retryCount += 1
throw IllegalStateException("wrapped error message", IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")/*.fillInStackTrace()*/))
throw IllegalStateException(
"wrapped error message",
IllegalStateException("another layer deep", SQLTransientConnectionException("Connection is not available")))
}
}

View File

@ -135,12 +135,13 @@ class InitFlow(private val party: Party) : FlowLogic<String>() {
}
}
@Suppress("TooGenericExceptionThrown")
@InitiatedBy(InitFlow::class)
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
initiatingSession.receive<String>().unwrap { it }
throw GenericJDBCException("Something went wrong!", SQLException("Oops!"))
throw Exception("Something went wrong!", SQLException("Oops!"))
}
}

View File

@ -0,0 +1,313 @@
package net.corda.node.services.vault
import com.r3.dbfailure.workflows.CreateStateFlow
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
import net.corda.core.CordaRuntimeException
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.findCordapp
import org.junit.After
import org.junit.Assert
import org.junit.Test
import rx.exceptions.OnErrorNotImplementedException
import java.sql.SQLException
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import javax.persistence.PersistenceException
import kotlin.test.assertFailsWith
class VaultObserverExceptionTest {
companion object {
val log = contextLogger()
private fun testCordapps() = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas"))
}
@After
fun tearDown() {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
StaffedFlowHospital.onFlowAdmitted.clear()
}
/**
* Causing an SqlException via a syntax error in a vault observer causes the flow to hit the
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation
*/
@Test
fun unhandledSqlExceptionFromVaultObserverGetsHospitatlised() {
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is SQLException -> {
testControlFuture.complete(true)
}
}
false
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(
::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
).returnValue.then { testControlFuture.complete(false) }
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
Assert.assertTrue(foundExpectedException)
}
}
/**
* Throwing a random (non-SQL releated) exception from a vault observer causes the flow to be
* aborted when unhandled in user code
*/
@Test
fun otherExceptionsFromVaultObserverBringFlowDown() {
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith(CordaRuntimeException::class, "Toys out of pram") {
aliceNode.rpc.startFlow(
::Initiator,
"InvalidParameterException",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter)
).returnValue.getOrThrow(30.seconds)
}
}
}
/**
* A random exception from a VaultObserver will bring the Rx Observer down, but can be handled in the flow
* triggering the observer, and the flow will continue successfully (for some values of success)
*/
@Test
fun otherExceptionsFromVaultObserverCanBeSuppressedInFlow() {
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
.returnValue.getOrThrow(30.seconds)
}
}
/**
* If the state we are trying to persist triggers a persistence exception, the flow hospital will retry the flow
* and keep it in for observation if errors persist.
*/
@Test
fun persistenceExceptionOnCommitGetsRetriedAndThenGetsKeptForObservation() {
var admitted = 0
var observation = 0
StaffedFlowHospital.onFlowAdmitted.add {
++admitted
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observation
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith<TimeoutException> {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
}
}
Assert.assertTrue("Exception from service has not been to Hospital", admitted > 0)
Assert.assertEquals(1, observation)
}
/**
* If we have a state causing a database error lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws. This will be kept in for observation.
*/
@Test
fun persistenceExceptionOnFlushGetsRetriedAndThenGetsKeptForObservation() {
var counter = 0
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is PersistenceException -> {
++counter
log.info("Got a PersistentException in the flow hospital count = $counter")
}
}
false
}
var observation = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observation
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith<TimeoutException>("PersistenceException") {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow(30.seconds)
}
}
Assert.assertTrue("Flow has not been to hospital", counter > 0)
Assert.assertEquals(1, observation)
}
/**
* If we have a state causing a database error lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws.
* Trying to catch and suppress that exception in the flow around the code triggering the vault observer
* does not change the outcome - the first exception in the service will bring the service down and will
* be caught by the flow, but the state machine will error the flow anyway as Corda code threw.
*/
@Test
fun persistenceExceptionOnFlushInVaultObserverCannotBeSuppressedInFlow() {
var counter = 0
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is PersistenceException -> {
++counter
log.info("Got a PersistentException in the flow hospital count = $counter")
}
}
false
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(
::Initiator,
"EntityManager",
CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
CreateStateFlow.ErrorTarget.TxInvalidState,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
val flowResult = flowHandle.returnValue
assertFailsWith<TimeoutException>("PersistenceException") { flowResult.getOrThrow(30.seconds) }
Assert.assertTrue("Flow has not been to hospital", counter > 0)
}
}
/**
* If we have a state causing a persistence exception lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws.
* Trying to catch and suppress that exception inside the service does protect the service, but the new
* interceptor will fail the flow anyway. The flow will be kept in for observation if errors persist.
*/
@Test
fun persistenceExceptionOnFlushInVaultObserverCannotBeSuppressedInService() {
var counter = 0
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is PersistenceException -> {
++counter
log.info("Got a PersistentException in the flow hospital count = $counter")
}
}
false
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(
::Initiator, "EntityManager",
CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceValidUpdate,
CreateStateFlow.ErrorTarget.TxInvalidState,
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
val flowResult = flowHandle.returnValue
assertFailsWith<TimeoutException>("PersistenceException") { flowResult.getOrThrow(30.seconds) }
Assert.assertTrue("Flow has not been to hospital", counter > 0)
}
}
/**
* User code throwing a syntax error in a raw vault observer will break the recordTransaction call,
* therefore handling it in flow code is no good, and the error will be passed to the flow hospital via the
* interceptor.
*/
@Test
fun syntaxErrorInUserCodeInServiceCannotBeSuppressedInFlow() {
val testControlFuture = openFuture<Boolean>()
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
log.info("Flow has been kept for overnight observation")
testControlFuture.set(true)
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
val flowResult = flowHandle.returnValue
flowResult.then {
log.info("Flow has finished")
testControlFuture.set(false)
}
Assert.assertTrue("Flow has not been kept in hospital", testControlFuture.getOrThrow(30.seconds))
}
}
/**
* User code throwing a syntax error and catching suppressing that within the observer code is fine
* and should not have any impact on the rest of the flow
*/
@Test
fun syntaxErrorInUserCodeInServiceCanBeSuppressedInService() {
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
val flowResult = flowHandle.returnValue
flowResult.getOrThrow(30.seconds)
}
}
}

View File

@ -1097,6 +1097,7 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi
override val deduplicationHandler: DeduplicationHandler
get() = this
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val flowLogic: FlowLogic<T>
get() = logic
override val context: InvocationContext
@ -1139,8 +1140,17 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
@Suppress("DEPRECATION")
org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
val attributeConverters = listOf(PublicKeyToTextConverter(), AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters, customClassLoader)
return CordaPersistence(
databaseConfig,
schemaService.schemaOptions.keys,
jdbcUrl,
cacheFactory,
attributeConverters, customClassLoader,
errorHandler = { t ->
FlowStateMachineImpl.currentStateMachine()?.scheduleEvent(Event.Error(t))
})
}
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) {

View File

@ -29,6 +29,7 @@ object NodeInfoSchemaV1 : MappedSchema(
@Column(name = "node_info_id", nullable = false)
var id: Int,
@Suppress("MagicNumber") // database column width
@Column(name = "node_info_hash", length = 64, nullable = false)
val hash: String,

View File

@ -11,6 +11,7 @@ import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
@ -239,6 +240,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
private inner class FlowStartDeduplicationHandler(val scheduledState: ScheduledStateRef, override val flowLogic: FlowLogic<Any?>, override val context: InvocationContext) : DeduplicationHandler, ExternalEvent.ExternalStartFlowEvent<Any?> {
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val externalCause: ExternalEvent
get() = this
override val deduplicationHandler: FlowStartDeduplicationHandler

View File

@ -2,12 +2,16 @@ package net.corda.node.services.identity
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.identity.x500Matches
import net.corda.core.internal.CertRole
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.hash
import net.corda.core.internal.toSet
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
@ -29,13 +33,18 @@ import org.hibernate.annotations.Type
import org.hibernate.internal.util.collections.ArrayHelper.EMPTY_BYTE_ARRAY
import java.security.InvalidAlgorithmParameterException
import java.security.PublicKey
import java.security.cert.*
import java.security.cert.CertPathValidatorException
import java.security.cert.CertStore
import java.security.cert.CertificateExpiredException
import java.security.cert.CertificateNotYetValidException
import java.security.cert.CollectionCertStoreParameters
import java.security.cert.TrustAnchor
import java.security.cert.X509Certificate
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import kotlin.IllegalStateException
import kotlin.collections.HashSet
import kotlin.streams.toList
@ -147,6 +156,7 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
@javax.persistence.Table(name = NAME_TO_HASH_TABLE_NAME)
class PersistentPartyToPublicKeyHash(
@Id
@Suppress("MagicNumber") // database column width
@Column(name = NAME_COLUMN_NAME, length = 128, nullable = false)
var name: String = "",

View File

@ -85,6 +85,7 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa
}
@Entity
@Suppress("MagicNumber") // database column width
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids")
class ProcessedMessage(
@Id

View File

@ -3,6 +3,7 @@ package net.corda.node.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.ThreadBox
@ -424,6 +425,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
private inner class MessageDeduplicationHandler(val artemisMessage: ClientMessage, override val receivedMessage: ReceivedMessage) : DeduplicationHandler, ExternalEvent.ExternalMessageEvent {
override val externalCause: ExternalEvent
get() = this
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val deduplicationHandler: MessageDeduplicationHandler
get() = this

View File

@ -29,6 +29,7 @@ class DBCheckpointStorage : CheckpointStorage {
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
class DBCheckpoint(
@Id
@Suppress("MagicNumber") // database column width
@Column(name = "checkpoint_id", length = 64, nullable = false)
var checkpointId: String = "",

View File

@ -30,6 +30,7 @@ import kotlin.streams.toList
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory) : WritableTransactionStorage, SingletonSerializeAsToken() {
@Suppress("MagicNumber") // database column width
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}transactions")
class DBTransaction(

View File

@ -120,10 +120,21 @@ class ActionExecutorImpl(
}
}
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, see comment in the catch clause
@Suspendable
private fun executeAcknowledgeMessages(action: Action.AcknowledgeMessages) {
action.deduplicationHandlers.forEach {
it.afterDatabaseTransaction()
try {
it.afterDatabaseTransaction()
} catch (e: Exception) {
// Catch all exceptions that occur in the [DeduplicationHandler]s (although errors should be unlikely)
// It is deemed safe for errors to occur here
// Therefore the current transition should not fail if something does go wrong
log.info(
"An error occurred executing a deduplication post-database commit handler. Continuing, as it is safe to do so.",
e
)
}
}
}
@ -218,17 +229,24 @@ class ActionExecutorImpl(
}
}
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, see comment in the catch clause
@Suspendable
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
val operationFuture = action.operation.execute(action.deduplicationId)
operationFuture.thenMatch(
success = { result ->
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
},
failure = { exception ->
fiber.scheduleEvent(Event.Error(exception))
}
)
try {
val operationFuture = action.operation.execute(action.deduplicationId)
operationFuture.thenMatch(
success = { result ->
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
},
failure = { exception ->
fiber.scheduleEvent(Event.Error(exception))
}
)
} catch (e: Exception) {
// Catch and wrap any unexpected exceptions from the async operation
// Wrapping the exception allows it to be better handled by the flow hospital
throw AsyncOperationTransitionException(e)
}
}
private fun executeRetryFlowFromSafePoint(action: Action.RetryFlowFromSafePoint) {

View File

@ -16,6 +16,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.mapError
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SerializedBytes
@ -113,7 +114,7 @@ class SingleThreadedStateMachineManager(
private var checkpointSerializationContext: CheckpointSerializationContext? = null
private var actionExecutor: ActionExecutor? = null
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID)
override val flowHospital: StaffedFlowHospital = makeFlowHospital()
private val transitionExecutor = makeTransitionExecutor()
override val allStateMachines: List<FlowLogic<*>>
@ -210,12 +211,14 @@ class SingleThreadedStateMachineManager(
}
private fun <A> startFlow(
flowId: StateMachineRunId,
flowLogic: FlowLogic<A>,
context: InvocationContext,
ourIdentity: Party?,
deduplicationHandler: DeduplicationHandler?
): CordaFuture<FlowStateMachine<A>> {
return startFlowInternal(
flowId,
invocationContext = context,
flowLogic = flowLogic,
flowStart = FlowStart.Explicit,
@ -230,7 +233,10 @@ class SingleThreadedStateMachineManager(
cancelTimeoutIfScheduled(id)
val flow = flows.remove(id)
if (flow != null) {
logger.debug("Killing flow known to physical node.")
flow.fiber.transientState?.let {
flow.fiber.transientState = TransientReference(it.value.copy(isRemoved = true))
}
logger.info("Killing flow $id known to this node.")
decrementLiveFibers()
totalFinishedFlows.inc()
try {
@ -239,6 +245,7 @@ class SingleThreadedStateMachineManager(
} finally {
database.transaction {
checkpointStorage.removeCheckpoint(id)
serviceHub.vaultService.softLockRelease(id.uuid)
}
transitionExecutor.forceRemoveFlow(id)
unfinishedFibers.countDown()
@ -343,58 +350,71 @@ class SingleThreadedStateMachineManager(
}
}
@Suppress("TooGenericExceptionCaught", "ComplexMethod", "MaxLineLength") // this is fully intentional here, see comment in the catch clause
override fun retryFlowFromSafePoint(currentState: StateMachineState) {
// Get set of external events
val flowId = currentState.flowLogic.runId
val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
try {
val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
// Resurrect flow
createFlowFromCheckpoint(
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
// Resurrect flow
createFlowFromCheckpoint(
id = flowId,
serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
) ?: return
} else {
// Just flow initiation message
null
}
mutex.locked {
if (stopping) {
return
) ?: return
} else {
// Just flow initiation message
null
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
mutex.locked {
if (stopping) {
return
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
} catch (e: Exception) {
// Failed to retry - manually put the flow in for observation rather than
// relying on the [HospitalisingInterceptor] to do so
val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(flowId, exceptions)
throw e
}
}
@ -410,7 +430,13 @@ class SingleThreadedStateMachineManager(
}
private fun <T> onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent<T>) {
val future = startFlow(event.flowLogic, event.context, ourIdentity = null, deduplicationHandler = event.deduplicationHandler)
val future = startFlow(
event.flowId,
event.flowLogic,
event.context,
ourIdentity = null,
deduplicationHandler = event.deduplicationHandler
)
event.wireUpFuture(future)
}
@ -476,7 +502,16 @@ class SingleThreadedStateMachineManager(
is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion
is InitiatedFlowFactory.CorDapp -> null
}
startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
startInitiatedFlow(
event.flowId,
flowLogic,
event.deduplicationHandler,
senderSession,
initiatedSessionId,
sessionMessage,
senderCoreFlowVersion,
initiatedFlowInfo
)
} catch (t: Throwable) {
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
"flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t)
@ -503,7 +538,9 @@ class SingleThreadedStateMachineManager(
return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass)
}
@Suppress("LongParameterList")
private fun <A> startInitiatedFlow(
flowId: StateMachineRunId,
flowLogic: FlowLogic<A>,
initiatingMessageDeduplicationHandler: DeduplicationHandler,
peerSession: FlowSessionImpl,
@ -515,13 +552,19 @@ class SingleThreadedStateMachineManager(
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
val ourIdentity = ourFirstIdentity
startFlowInternal(
InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
flowId,
InvocationContext.peer(peerSession.counterparty.name),
flowLogic,
flowStart,
ourIdentity,
initiatingMessageDeduplicationHandler,
isStartIdempotent = false
)
}
@Suppress("LongParameterList")
private fun <A> startFlowInternal(
flowId: StateMachineRunId,
invocationContext: InvocationContext,
flowLogic: FlowLogic<A>,
flowStart: FlowStart,
@ -529,7 +572,6 @@ class SingleThreadedStateMachineManager(
deduplicationHandler: DeduplicationHandler?,
isStartIdempotent: Boolean
): CordaFuture<FlowStateMachine<A>> {
val flowId = StateMachineRunId.createRandom()
// Before we construct the state machine state by freezing the FlowLogic we need to make sure that lazy properties
// have access to the fiber (and thereby the service hub)
@ -541,22 +583,44 @@ class SingleThreadedStateMachineManager(
val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
val initialCheckpoint = Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
val flowAlreadyExists = mutex.locked { flows[flowId] != null }
val existingCheckpoint = if (flowAlreadyExists) {
// Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint ->
val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId)
if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError {
IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " +
"Something is very wrong. The flow will not retry.")
}
} else {
checkpoint
}
}
} else {
// This is a brand new flow
null
}
val checkpoint = existingCheckpoint ?: Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
).getOrThrow()
val startedFuture = openFuture<Unit>()
val initialState = StateMachineState(
checkpoint = initialCheckpoint,
checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,
isAnyCheckpointPersisted = false,
isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = flowLogic,
@ -817,6 +881,12 @@ class SingleThreadedStateMachineManager(
return interceptors.fold(transitionExecutor) { executor, interceptor -> interceptor(executor) }
}
private fun makeFlowHospital() : StaffedFlowHospital {
// If the node is running as a notary service, we don't retain errored session initiation requests in case of missing Cordapps
// to avoid memory leaks if the notary is under heavy load.
return StaffedFlowHospital(flowMessaging, serviceHub.clock, ourSenderUUID)
}
private fun InnerState.removeFlowOrderly(
flow: Flow,
removalReason: FlowRemovalReason.OrderlyFinish,

View File

@ -10,48 +10,103 @@ import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.FinalityHandler
import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject
import java.sql.SQLException
import java.sql.SQLTransientConnectionException
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException
import kotlin.concurrent.timerTask
import kotlin.math.pow
/**
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
*/
class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) {
private companion object {
class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
private val clock: Clock,
private val ourSenderUUID: String) {
companion object {
private val log = contextLogger()
private val staff = listOf(
DeadlockNurse,
DuplicateInsertSpecialist,
DoctorTimeout,
FinalityDoctor,
TransientConnectionCardiologist
DeadlockNurse,
DuplicateInsertSpecialist,
DoctorTimeout,
FinalityDoctor,
TransientConnectionCardiologist,
DatabaseEndocrinologist,
TransitionErrorGeneralPractitioner
)
@VisibleForTesting
val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>()
}
private val hospitalJobTimer = Timer("FlowHospitalJobTimer", true)
init {
// Register a task to log (at intervals) flows that are kept in hospital for overnight observation.
hospitalJobTimer.scheduleAtFixedRate(timerTask {
mutex.locked {
if (flowsInHospital.isNotEmpty()) {
// Get patients whose last record in their medical records is Outcome.OVERNIGHT_OBSERVATION.
val patientsUnderOvernightObservation =
flowsInHospital.filter { flowPatients[it.key]?.records?.last()?.outcome == Outcome.OVERNIGHT_OBSERVATION }
if (patientsUnderOvernightObservation.isNotEmpty())
log.warn("There are ${patientsUnderOvernightObservation.count()} flows kept for overnight observation. " +
"Affected flow ids: ${patientsUnderOvernightObservation.map { it.key.uuid.toString() }.joinToString()}")
}
if (treatableSessionInits.isNotEmpty()) {
log.warn("There are ${treatableSessionInits.count()} erroneous session initiations kept for overnight observation. " +
"Erroneous session initiation ids: ${treatableSessionInits.map { it.key.toString() }.joinToString()}")
}
}
}, 1.minutes.toMillis(), 1.minutes.toMillis())
}
/**
* Represents the flows that have been admitted to the hospital for treatment.
* Flows should be removed from [flowsInHospital] when they have completed a successful transition.
*/
private val flowsInHospital = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
private val mutex = ThreadBox(object {
/**
* Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital,
* but their medical history will be retained.
*
* Flows should be removed from [flowPatients] when they have completed successfully. Upon successful completion,
* the medical history of a flow is no longer relevant as that flow has been completely removed from the
* statemachine.
*/
val flowPatients = HashMap<StateMachineRunId, FlowMedicalHistory>()
val treatableSessionInits = HashMap<UUID, InternalSessionInitRecord>()
val recordsPublisher = PublishSubject.create<MedicalRecord>()
})
private val secureRandom = newSecureRandom()
private val delayedDischargeTimer = Timer("FlowHospitalDelayedDischargeTimer", true)
/**
* The node was unable to initiate the [InitialSessionMessage] from [sender].
*/
fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) {
val time = Instant.now()
val time = clock.instant()
val id = UUID.randomUUID()
val outcome = if (error is SessionRejectException.UnknownClass) {
// We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is
@ -104,11 +159,48 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
}
/**
* The flow running in [flowFiber] has errored.
* Forces the flow to be kept in for overnight observation by the hospital. A flow must already exist inside the hospital
* and have existing medical records for it to be moved to overnight observation. If it does not meet these criteria then
* an [IllegalArgumentException] will be thrown.
*
* @param id The [StateMachineRunId] of the flow that you are trying to force into observation
* @param errors The errors to include in the new medical record
*/
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
val time = Instant.now()
fun forceIntoOvernightObservation(id: StateMachineRunId, errors: List<Throwable>) {
mutex.locked {
// If a flow does not meet the criteria below, then it has moved into an invalid state or the function is being
// called from an incorrect location. The assertions below should error out the flow if they are not true.
requireNotNull(flowsInHospital[id]) { "Flow must already be in the hospital before forcing into overnight observation" }
val history = requireNotNull(flowPatients[id]) { "Flow must already have history before forcing into overnight observation" }
// Use the last staff member that last discharged the flow as the current staff member
val record = history.records.last().copy(
time = clock.instant(),
errors = errors,
outcome = Outcome.OVERNIGHT_OBSERVATION
)
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(id, record.by.map { it.toString() }) }
history.records += record
recordsPublisher.onNext(record)
}
}
/**
* Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being
* treated.
*/
fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
// Only treat flows that are not already in the hospital
if (!currentState.isRemoved && flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) {
admit(flowFiber, currentState, errors)
}
}
@Suppress("ComplexMethod")
private fun admit(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
val time = clock.instant()
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
onFlowAdmitted.forEach { it.invoke(flowFiber.id) }
val (event, backOffForChronicCondition) = mutex.locked {
val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() }
@ -119,15 +211,17 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
Diagnosis.DISCHARGE -> {
val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState)
log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff)
}
Diagnosis.OVERNIGHT_OBSERVATION -> {
log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds)
}
Diagnosis.NOT_MY_SPECIALTY -> {
// None of the staff care for these errors so we let them propagate
Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> {
// None of the staff care for these errors, or someone decided it is a terminal condition, so we let them propagate
log.info("Flow error allowed to propagate", report.error)
Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds)
}
@ -143,10 +237,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
if (backOffForChronicCondition.isZero) {
flowFiber.scheduleEvent(event)
} else {
delayedDischargeTimer.schedule(object : TimerTask() {
override fun run() {
flowFiber.scheduleEvent(event)
}
hospitalJobTimer.schedule(timerTask {
flowFiber.scheduleEvent(event)
}, backOffForChronicCondition.toMillis())
}
}
@ -185,12 +277,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
private data class ConsultationReport(val error: Throwable, val diagnosis: Diagnosis, val by: List<Staff>)
/**
* The flow has been removed from the state machine.
* Remove the flow's medical history from the hospital.
*/
fun flowRemoved(flowId: StateMachineRunId) {
fun removeMedicalHistory(flowId: StateMachineRunId) {
mutex.locked { flowPatients.remove(flowId) }
}
/**
* Remove the flow from the hospital as it is not currently being treated.
*/
fun leave(id: StateMachineRunId) {
flowsInHospital.remove(id)
}
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
/** Returns a stream of medical records as flows pass through the hospital. */
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {
@ -251,6 +350,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
/** The order of the enum values are in priority order. */
enum class Diagnosis {
/** The flow should not see other staff members */
TERMINAL,
/** Retry from last safe point. */
DISCHARGE,
/** Park and await intervention. */
@ -259,7 +360,6 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
NOT_MY_SPECIALTY
}
interface Staff {
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
}
@ -288,7 +388,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
*/
object DuplicateInsertSpecialist : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (newError.mentionsThrowable(ConstraintViolationException::class.java) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
return if (newError.mentionsThrowable(ConstraintViolationException::class.java)
&& history.notDischargedForTheSameThingMoreThan(2, this, currentState)) {
Diagnosis.DISCHARGE
} else {
Diagnosis.NOT_MY_SPECIALTY
@ -334,7 +435,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
}
private fun isErrorPropagatedFromCounterparty(error: Throwable): Boolean {
return when(error) {
return when (error) {
is UnexpectedFlowEndException -> {
val peer = DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", error).value
peer != null
@ -358,17 +459,21 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
val strippedStacktrace = error.stackTrace
.filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
.filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
return strippedStacktrace.isNotEmpty() &&
strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!! )
return strippedStacktrace.isNotEmpty()
&& strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!)
}
}
/**
* [SQLTransientConnectionException] detection that arise from failing to connect the underlying database/datasource
*/
object TransientConnectionCardiologist : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if (mentionsTransientConnection(newError)) {
if (history.notDischargedForTheSameThingMoreThan(2, this, currentState)) {
Diagnosis.DISCHARGE
@ -384,6 +489,72 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
return exception.mentionsThrowable(SQLTransientConnectionException::class.java, "connection is not available")
}
}
/**
* Hospitalise any database (SQL and Persistence) exception that wasn't handled otherwise, unless on the configurable whitelist
* Note that retry decisions from other specialists will not be affected as retries take precedence over hospitalisation.
*/
object DatabaseEndocrinologist : Staff {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if ((newError is SQLException || newError is PersistenceException) && !customConditions.any { it(newError) }) {
Diagnosis.OVERNIGHT_OBSERVATION
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
@VisibleForTesting
val customConditions = mutableSetOf<(t: Throwable) -> Boolean>()
}
/**
* Handles exceptions from internal state transitions that are not dealt with by the rest of the staff.
*
* [InterruptedException]s are diagnosed as [Diagnosis.TERMINAL] so they are never retried
* (can occur when a flow is killed - `killFlow`).
* [AsyncOperationTransitionException]s ares ignored as the error is likely to have originated in user async code rather than inside
* of a transition.
* All other exceptions are retried a maximum of 3 times before being kept in for observation.
*/
object TransitionErrorGeneralPractitioner : Staff {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if (newError.mentionsThrowable(StateTransitionException::class.java)) {
when {
newError.mentionsThrowable(InterruptedException::class.java) -> Diagnosis.TERMINAL
newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY
history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE
else -> Diagnosis.OVERNIGHT_OBSERVATION
}
} else {
Diagnosis.NOT_MY_SPECIALTY
}.also { logDiagnosis(it, newError, flowFiber, history) }
}
private fun logDiagnosis(diagnosis: Diagnosis, newError: Throwable, flowFiber: FlowFiber, history: FlowMedicalHistory) {
if (diagnosis != Diagnosis.NOT_MY_SPECIALTY) {
log.debug {
"""
Flow ${flowFiber.id} given $diagnosis diagnosis due to a transition error
- Exception: ${newError.message}
- History: $history
${(newError as? StateTransitionException)?.transitionAction?.let { "- Action: $it" }}
${(newError as? StateTransitionException)?.transitionEvent?.let { "- Event: $it" }}
""".trimIndent()
}
}
}
}
}
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {
@ -396,4 +567,5 @@ private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>
true
}
return (exceptionType.isAssignableFrom(this::class.java) && containsMessage) || cause.mentionsThrowable(exceptionType, errorMessage)
}
}

View File

@ -114,6 +114,7 @@ interface ExternalEvent {
* An external P2P message event.
*/
interface ExternalMessageEvent : ExternalEvent {
val flowId: StateMachineRunId
val receivedMessage: ReceivedMessage
}
@ -121,6 +122,7 @@ interface ExternalEvent {
* An external request to start a flow, from the scheduler for example.
*/
interface ExternalStartFlowEvent<T> : ExternalEvent {
val flowId: StateMachineRunId
val flowLogic: FlowLogic<T>
val context: InvocationContext

View File

@ -0,0 +1,18 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaException
import net.corda.core.serialization.ConstructorForDeserialization
// CORDA-3353 - These exceptions should not be propagated up to rpc as they suppress the real exceptions
class StateTransitionException(
val transitionAction: Action?,
val transitionEvent: Event?,
val exception: Exception
) : CordaException(exception.message, exception) {
@ConstructorForDeserialization
constructor(exception: Exception): this(null, null, exception)
}
class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception)

View File

@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.security.SecureRandom
import javax.persistence.OptimisticLockException
/**
* This [TransitionExecutor] runs the transition actions using the passed in [ActionExecutor] and manually dirties the
@ -27,6 +28,7 @@ class TransitionExecutorImpl(
val log = contextLogger()
}
@Suppress("NestedBlockDepth", "ReturnCount")
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
@ -47,15 +49,24 @@ class TransitionExecutorImpl(
// Instead we just keep around the old error state and wait for a new schedule, perhaps
// triggered from a flow hospital
log.warn("Error while executing $action during transition to errored state, aborting transition", exception)
// CORDA-3354 - Go to the hospital with the new error that has occurred
// while already in a error state (as this error could be for a different reason)
return Pair(FlowContinuation.Abort, previousState.copy(isFlowResumed = false))
} else {
// Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork
// to trigger error propagation
log.info("Error while executing $action, erroring state", exception)
log.info("Error while executing $action, with event $event, erroring state", exception)
if(previousState.isRemoved && exception is OptimisticLockException) {
log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception)
} else {
log.info("Error while executing $action, with event $event, erroring state", exception)
}
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), exception))
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
listOf(FlowError(secureRandom.nextLong(), StateTransitionException(action, event, exception)))
)
),
isFlowResumed = false
@ -67,4 +78,4 @@ class TransitionExecutorImpl(
}
return Pair(transition.continuation, transition.newState)
}
}
}

View File

@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap
* This interceptor records a trace of all of the flows' states and transitions. If the flow dirties it dumps the trace
* transition to the logger.
*/
@Suppress("MaxLineLength") // detekt confusing the whole if statement for a line
class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : TransitionExecutor {
companion object {
private val log = contextLogger()
@ -34,18 +35,23 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation)
val record = records.compute(fiber.id) { _, record ->
(record ?: ArrayList()).apply { add(transitionRecord) }
}
val (continuation, nextState)
= delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
// Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition
// information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere.
if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) {
log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}")
for (error in nextState.checkpoint.errorState.errors) {
log.warn("Flow ${fiber.id} error", error.exception)
if (!previousState.isRemoved) {
val transitionRecord =
TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation)
val record = records.compute(fiber.id) { _, record ->
(record ?: ArrayList()).apply { add(transitionRecord) }
}
// Just if we decide to propagate, and not if just on the way to the hospital. Only log at debug level here - the flow transition
// information is often unhelpful in the logs, and the actual cause of the problem will be logged elsewhere.
if (nextState.checkpoint.errorState is ErrorState.Errored && nextState.checkpoint.errorState.propagating) {
log.warn("Flow ${fiber.id} errored, dumping all transitions:\n${record!!.joinToString("\n")}")
for (error in nextState.checkpoint.errorState.errors) {
log.warn("Flow ${fiber.id} error", error.exception)
}
}
}

View File

@ -11,7 +11,6 @@ import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.ConcurrentHashMap
/**
* This interceptor notifies the passed in [flowHospital] in case a flow went through a clean->errored or a errored->clean
@ -27,12 +26,10 @@ class HospitalisingInterceptor(
}
private fun removeFlow(id: StateMachineRunId) {
hospitalisedFlows.remove(id)
flowHospital.flowRemoved(id)
flowHospital.leave(id)
flowHospital.removeMedicalHistory(id)
}
private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
@ -41,19 +38,19 @@ class HospitalisingInterceptor(
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
// If the fiber's previous state was clean then remove it from the hospital
// This is important for retrying a flow that has errored during a state machine transition
if (previousState.checkpoint.errorState is ErrorState.Clean) {
flowHospital.leave(fiber.id)
}
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
when (nextState.checkpoint.errorState) {
is ErrorState.Clean -> {
hospitalisedFlows.remove(fiber.id)
}
is ErrorState.Errored -> {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
if (hospitalisedFlows.putIfAbsent(fiber.id, fiber) == null) {
flowHospital.flowErrored(fiber, previousState, exceptionsToHandle)
}
}
}
if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle)
}
if (nextState.isRemoved) {
removeFlow(fiber.id)
}

View File

@ -72,6 +72,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
var requestDate: Instant
)
@Suppress("MagicNumber") // database column length
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_txs")
class CommittedTransaction(

View File

@ -12,6 +12,7 @@ import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table
@Suppress("MagicNumber") // database column length
class ContractUpgradeServiceImpl(cacheFactory: NamedCacheFactory) : ContractUpgradeService, SingletonSerializeAsToken() {
@Entity

View File

@ -23,6 +23,7 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.nodeapi.internal.persistence.*
import org.hibernate.Session
import rx.Observable
import rx.exceptions.OnErrorNotImplementedException
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.time.Clock
@ -390,7 +391,15 @@ class NodeVaultService(
}
}
persistentStateService.persist(vaultUpdate.produced + vaultUpdate.references)
updatesPublisher.onNext(vaultUpdate)
try {
updatesPublisher.onNext(vaultUpdate)
} catch (e: OnErrorNotImplementedException) {
log.warn("Caught an Rx.OnErrorNotImplementedException " +
"- caused by an exception in an RX observer that was unhandled " +
"- the observer has been unsubscribed! The underlying exception will be rethrown.", e)
// if the observer code threw, unwrap their exception from the RX wrapper
throw e.cause ?: e
}
}
}
}

View File

@ -26,6 +26,7 @@ object VaultSchema
/**
* First version of the Vault ORM schema
*/
@Suppress("MagicNumber") // database column length
@CordaSerializable
object VaultSchemaV1 : MappedSchema(
schemaFamily = VaultSchema.javaClass,

View File

@ -127,6 +127,7 @@ class BFTSmartNotaryService(
}
}
@Suppress("MagicNumber") // database column length
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}bft_committed_txs")
class CommittedTransaction(

View File

@ -104,6 +104,7 @@ class RaftUniquenessProvider(
var index: Long = 0
)
@Suppress("MagicNumber") // database column length
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}raft_committed_txs")
class CommittedTransaction(

View File

@ -37,7 +37,8 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.assertj.core.api.Condition
import org.junit.After
@ -115,18 +116,16 @@ class FlowFrameworkTests {
}
@Test
fun `exception while fiber suspended`() {
fun `exception while fiber suspended is retried and completes successfully`() {
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
val flow = ReceiveFlow(bob)
val fiber = aliceNode.services.startFlow(flow) as FlowStateMachineImpl
// Before the flow runs change the suspend action to throw an exception
val exceptionDuringSuspend = Exception("Thrown during suspend")
val throwingActionExecutor = SuspendThrowingActionExecutor(exceptionDuringSuspend, fiber.transientValues!!.value.actionExecutor)
val throwingActionExecutor = SuspendThrowingActionExecutor(Exception("Thrown during suspend"),
fiber.transientValues!!.value.actionExecutor)
fiber.transientValues = TransientReference(fiber.transientValues!!.value.copy(actionExecutor = throwingActionExecutor))
mockNet.runNetwork()
assertThatThrownBy {
fiber.resultFuture.getOrThrow()
}.isSameAs(exceptionDuringSuspend)
fiber.resultFuture.getOrThrow()
assertThat(aliceNode.smm.allStateMachines).isEmpty()
// Make sure the fiber does actually terminate
assertThat(fiber.state).isEqualTo(Strand.State.WAITING)

View File

@ -2,14 +2,18 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.*
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message
@ -17,11 +21,13 @@ import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.common.internal.eventually
import net.corda.testing.core.TestIdentity
import net.corda.testing.node.internal.*
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.MessagingServiceSpy
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.newContext
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.h2.util.Utils
import org.hibernate.exception.ConstraintViolationException
import org.junit.After
import org.junit.Assert.assertTrue
import org.junit.Before
@ -49,6 +55,8 @@ class RetryFlowMockTest {
SendAndRetryFlow.count = 0
RetryInsertFlow.count = 0
KeepSendingFlow.count.set(0)
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is LimitedRetryCausingError }
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is RetryCausingError }
}
private fun <T> TestStartedNode.startFlow(logic: FlowLogic<T>): CordaFuture<T> {
@ -58,6 +66,7 @@ class RetryFlowMockTest {
@After
fun cleanUp() {
mockNet.stopNodes()
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
}
@Test
@ -66,14 +75,6 @@ class RetryFlowMockTest {
assertEquals(2, RetryFlow.count)
}
@Test
fun `Retry forever`() {
assertThatThrownBy {
nodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow()
}.isInstanceOf(LimitedRetryCausingError::class.java)
assertEquals(5, RetryFlow.count)
}
@Test
fun `Retry does not set senderUUID`() {
val messagesSent = Collections.synchronizedList(mutableListOf<Message>())
@ -184,8 +185,7 @@ class RetryFlowMockTest {
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
}
class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint")
class LimitedRetryCausingError : IllegalStateException("I am going to live forever")
class RetryCausingError : SQLException("deadlock")

View File

@ -6,6 +6,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.services.queryBy
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.DummyCommandData
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.vault.DUMMY_DEAL_PROGRAM_ID
@ -16,12 +17,13 @@ import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetworkNotarySpec
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.StartedMockNode
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ExecutionException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class VaultFlowTest {
private lateinit var mockNetwork: MockNetwork
@ -48,14 +50,19 @@ class VaultFlowTest {
@After
fun tearDown() {
mockNetwork.stopNodes()
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
}
@Test
fun `Unique column constraint failing causes states to not persist to vaults`() {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException })
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
Assertions.assertThatExceptionOfType(ExecutionException::class.java).isThrownBy {
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
}
val hospitalLatch = CountDownLatch(1)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> hospitalLatch.countDown() }
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity())))
assertTrue(hospitalLatch.await(10, TimeUnit.SECONDS), "Flow not hospitalised")
assertEquals(1, partyA.transaction {
partyA.services.vaultService.queryBy<UniqueDummyLinearContract.State>().states.size
})

View File

@ -78,6 +78,8 @@ include 'samples:network-verifier:contracts'
include 'samples:network-verifier:workflows'
include 'serialization'
include 'serialization-tests'
include 'testing:cordapps:dbfailure:dbfcontracts'
include 'testing:cordapps:dbfailure:dbfworkflows'
// Common libraries - start
include 'common-validation'

View File

@ -0,0 +1,18 @@
apply plugin: 'kotlin'
//apply plugin: 'net.corda.plugins.cordapp'
//apply plugin: 'net.corda.plugins.quasar-utils'
repositories {
mavenLocal()
mavenCentral()
maven { url "$artifactory_contextUrl/corda-dependencies" }
maven { url "$artifactory_contextUrl/corda" }
}
dependencies {
compile project(":core")
}
jar{
baseName "testing-dbfailure-contracts"
}

View File

@ -0,0 +1,50 @@
package com.r3.dbfailure.contracts
import com.r3.dbfailure.schemas.DbFailureSchemaV1
import net.corda.core.contracts.CommandData
import net.corda.core.contracts.Contract
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.transactions.LedgerTransaction
import java.lang.IllegalArgumentException
class DbFailureContract : Contract {
companion object {
@JvmStatic
val ID = "com.r3.dbfailure.contracts.DbFailureContract"
}
class TestState(
override val linearId: UniqueIdentifier,
val particpant: Party,
val randomValue: String?,
val errorTarget: Int = 0
) : LinearState, QueryableState {
override val participants: List<AbstractParty> = listOf(particpant)
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(DbFailureSchemaV1)
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return if (schema is DbFailureSchemaV1){
DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id)
}
else {
throw IllegalArgumentException("Unsupported schema $schema")
}
}
}
override fun verify(tx: LedgerTransaction) {
// no op - don't care for now
}
interface Commands : CommandData{
class Create: Commands
}
}

View File

@ -0,0 +1,35 @@
package com.r3.dbfailure.schemas
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import java.util.*
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Table
object DbFailureSchema
object DbFailureSchemaV1 : MappedSchema(
schemaFamily = DbFailureSchema.javaClass,
version = 1,
mappedTypes = listOf(DbFailureSchemaV1.PersistentTestState::class.java)){
override val migrationResource = "dbfailure.changelog-master"
@Entity
@Table( name = "fail_test_states")
class PersistentTestState(
@Column( name = "participant")
var participantName: String,
@Column( name = "random_value", nullable = false)
var randomValue: String?,
@Column( name = "error_target")
var errorTarget: Int,
@Column( name = "linear_id")
var linearId: UUID
) : PersistentState() {
constructor() : this( "", "", 0, UUID.randomUUID())
}
}

View File

@ -0,0 +1,8 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" >
<changeSet author="R3.Corda" id="test dbfailure error target">
<addColumn tableName="fail_test_states">
<column name="error_target" type="INT"></column>
</addColumn>
</changeSet>
</databaseChangeLog>

View File

@ -0,0 +1,20 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd" >
<changeSet author="R3.Corda" id="test dbfailure init">
<createTable tableName="fail_test_states">
<column name="output_index" type="INT">
<constraints nullable="false"/>
</column>
<column name="transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="participant" type="NVARCHAR(255)">
<constraints nullable="false"/>
</column>
<column name="random_value" type="NVARCHAR(255)">
<constraints nullable="false"/>
</column>
<column name="linear_id" type="BINARY(255)"/>
</createTable>
</changeSet>
</databaseChangeLog>

View File

@ -0,0 +1,8 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<include file="migration/dbfailure.changelog-init.xml"/>
<include file="migration/dbfailure.changelog-errortarget.xml"/>
</databaseChangeLog>

View File

@ -0,0 +1,12 @@
apply plugin: 'kotlin'
//apply plugin: 'net.corda.plugins.cordapp'
//apply plugin: 'net.corda.plugins.quasar-utils'
dependencies {
compile project(":core")
compile project(":testing:cordapps:dbfailure:dbfcontracts")
}
jar{
baseName "testing-dbfailure-workflows"
}

View File

@ -0,0 +1,99 @@
package com.r3.dbfailure.workflows
import co.paralleluniverse.fibers.Suspendable
import com.r3.dbfailure.contracts.DbFailureContract
import net.corda.core.contracts.Command
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.transactions.TransactionBuilder
// There is a bit of number fiddling in this class to encode/decode the error target instructions
@Suppress("MagicNumber")
object CreateStateFlow {
// Encoding of error targets
// 1s are errors actions to be taken in the vault listener in the service
// 10s are errors caused in the flow
// 100s control exception handling in the flow
// 1000s control exception handlling in the service/vault listener
enum class ErrorTarget(val targetNumber: Int) {
NoError(0),
ServiceSqlSyntaxError(1),
ServiceNullConstraintViolation(2),
ServiceValidUpdate(3),
ServiceReadState(4),
ServiceCheckForState(5),
ServiceThrowInvalidParameter(6),
TxInvalidState(10),
FlowSwallowErrors(100),
ServiceSwallowErrors(1000)
}
fun errorTargetsToNum(vararg targets: ErrorTarget): Int {
return targets.map { it.targetNumber }.sum()
}
private val targetMap = ErrorTarget.values().associateBy(ErrorTarget::targetNumber)
fun getServiceTarget(target: Int?): ErrorTarget {
return target?.let { targetMap.getValue(it % 10) } ?: CreateStateFlow.ErrorTarget.NoError
}
fun getServiceExceptionHandlingTarget(target: Int?): ErrorTarget {
return target?.let { targetMap.getValue(((it / 1000) % 10) * 1000) } ?: CreateStateFlow.ErrorTarget.NoError
}
fun getTxTarget(target: Int?): ErrorTarget {
return target?.let { targetMap.getValue(((it / 10) % 10) * 10) } ?: CreateStateFlow.ErrorTarget.NoError
}
fun getFlowTarget(target: Int?): ErrorTarget {
return target?.let { targetMap.getValue(((it / 100) % 10) * 100) } ?: CreateStateFlow.ErrorTarget.NoError
}
@InitiatingFlow
@StartableByRPC
class Initiator(private val randomValue: String, private val errorTarget: Int) : FlowLogic<UniqueIdentifier>() {
@Suspendable
override fun call(): UniqueIdentifier {
logger.info("Test flow: starting")
val notary = serviceHub.networkMapCache.notaryIdentities[0]
val txTarget = getTxTarget(errorTarget)
logger.info("Test flow: The tx error target is $txTarget")
val state = DbFailureContract.TestState(
UniqueIdentifier(),
ourIdentity,
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
errorTarget)
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
logger.info("Test flow: tx builder")
val txBuilder = TransactionBuilder(notary)
.addOutputState(state)
.addCommand(txCommand)
logger.info("Test flow: verify")
txBuilder.verify(serviceHub)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions according to config
try {
logger.info("Test flow: recording transaction")
serviceHub.recordTransactions(signedTx)
} catch (t: Throwable) {
if (getFlowTarget(errorTarget) == CreateStateFlow.ErrorTarget.FlowSwallowErrors) {
logger.info("Test flow: Swallowing all exception! Muahahaha!", t)
} else {
logger.info("Test flow: caught exception - rethrowing")
throw t
}
}
logger.info("Test flow: returning")
return state.linearId
}
}
}

View File

@ -0,0 +1,98 @@
package com.r3.dbfailure.workflows
import com.r3.dbfailure.contracts.DbFailureContract
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import java.security.InvalidParameterException
@CordaService
class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
companion object {
val log = contextLogger()
}
init {
services.vaultService.rawUpdates.subscribe { (_, produced) ->
produced.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
try {
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
log.info("Fail with syntax error on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"BLAAA RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
log.info("Fail with null constraint violation on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
log.info("Update current statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceReadState -> {
log.info("Read current state from db")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"SELECT * FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
log.info("Check for currently written state in the db")
val session = services.jdbcSession()
val statement = session.createStatement()
val rs = statement.executeQuery(
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}")
}
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
log.info("Throw InvalidParameterException")
throw InvalidParameterException("Toys out of pram")
}
else -> {
// do nothing, everything else must be handled elsewhere
}
}
} catch (t: Throwable) {
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
log.warn("Service not letting errors escape", t)
} else {
throw t
}
}
}
}
}
}

View File

@ -18,6 +18,7 @@ import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
@ -57,6 +58,7 @@ import rx.schedulers.Schedulers
import java.io.File
import java.net.ConnectException
import java.net.URL
import java.net.URLClassLoader
import java.nio.file.Path
import java.security.cert.X509Certificate
import java.time.Duration
@ -124,6 +126,14 @@ class DriverDSLImpl(
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy { resolveJar("co.paralleluniverse.fibers.Suspendable") }
private val bytemanJarPath: String? by lazy {
try {
resolveJar("org.jboss.byteman.agent.Transformer")
} catch (e: Exception) {
null
}
}
private fun NodeConfig.checkAndOverrideForInMemoryDB(): NodeConfig = this.run {
if (inMemoryDB && corda.dataSourceProperties.getProperty("dataSource.url").startsWith("jdbc:h2:")) {
val jdbcUrl = "jdbc:h2:mem:persistence${inMemoryCounter.getAndIncrement()};DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100"
@ -178,7 +188,9 @@ class DriverDSLImpl(
}
}
override fun startNode(parameters: NodeParameters): CordaFuture<NodeHandle> {
override fun startNode(parameters: NodeParameters): CordaFuture<NodeHandle> = startNode(parameters, bytemanPort = null)
override fun startNode(parameters: NodeParameters, bytemanPort: Int?): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = parameters.providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB")
@ -193,15 +205,17 @@ class DriverDSLImpl(
return registrationFuture.flatMap {
networkMapAvailability.flatMap {
// But starting the node proper does require the network map
startRegisteredNode(name, it, parameters, p2pAddress)
startRegisteredNode(name, it, parameters, p2pAddress, bytemanPort)
}
}
}
@Suppress("ComplexMethod")
private fun startRegisteredNode(name: CordaX500Name,
localNetworkMap: LocalNetworkMap?,
parameters: NodeParameters,
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()): CordaFuture<NodeHandle> {
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort(),
bytemanPort: Int? = null): CordaFuture<NodeHandle> {
val rpcAddress = portAllocation.nextHostAndPort()
val rpcAdminAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
@ -240,7 +254,7 @@ class DriverDSLImpl(
allowMissingConfig = true,
configOverrides = if (overrides.hasPath("devMode")) overrides else overrides + mapOf("devMode" to true)
)).checkAndOverrideForInMemoryDB()
return startNodeInternal(config, webAddress, localNetworkMap, parameters)
return startNodeInternal(config, webAddress, localNetworkMap, parameters, bytemanPort)
}
private fun startNodeRegistration(
@ -542,6 +556,8 @@ class DriverDSLImpl(
config,
quasarJarPath,
debugPort,
bytemanJarPath,
null,
systemProperties,
"512m",
null,
@ -553,10 +569,12 @@ class DriverDSLImpl(
}
}
@Suppress("ComplexMethod")
private fun startNodeInternal(config: NodeConfig,
webAddress: NetworkHostAndPort,
localNetworkMap: LocalNetworkMap?,
parameters: NodeParameters): CordaFuture<NodeHandle> {
parameters: NodeParameters,
bytemanPort: Int?): CordaFuture<NodeHandle> {
val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName)
val baseDirectory = config.corda.baseDirectory.createDirectories()
localNetworkMap?.networkParametersCopier?.install(baseDirectory)
@ -602,7 +620,16 @@ class DriverDSLImpl(
nodeFuture
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(config, quasarJarPath, debugPort, systemProperties, parameters.maximumHeapSize, parameters.logLevelOverride)
val process = startOutOfProcessNode(
config,
quasarJarPath,
debugPort,
bytemanJarPath,
bytemanPort,
systemProperties,
parameters.maximumHeapSize,
parameters.logLevelOverride
)
// Destroy the child process when the parent exits.This is needed even when `waitForAllNodesToFinish` is
// true because we don't want orphaned processes in the case that the parent process is terminated by the
@ -726,16 +753,21 @@ class DriverDSLImpl(
}
}
@Suppress("ComplexMethod", "MaxLineLength")
private fun startOutOfProcessNode(
config: NodeConfig,
quasarJarPath: String,
debugPort: Int?,
bytemanJarPath: String?,
bytemanPort: Int?,
overriddenSystemProperties: Map<String, String>,
maximumHeapSize: String,
logLevelOverride: String?,
vararg extraCmdLineFlag: String
): Process {
log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, " +
"debug port is " + (debugPort ?: "not enabled") + ", " +
"byteMan: " + if (bytemanJarPath == null) "not in classpath" else "port is " + (bytemanPort ?: "not enabled"))
// Write node.conf
writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe.toNodeOnly())
@ -777,6 +809,20 @@ class DriverDSLImpl(
it += extraCmdLineFlag
}.toList()
val bytemanJvmArgs = {
val bytemanAgent = bytemanJarPath?.let {
bytemanPort?.let {
"-javaagent:$bytemanJarPath=port:$bytemanPort,listener:true"
}
}
listOfNotNull(bytemanAgent) +
if (bytemanAgent != null && debugPort != null) listOf(
"-Dorg.jboss.byteman.verbose=true",
"-Dorg.jboss.byteman.debug=true"
)
else emptyList()
}.invoke()
// The following dependencies are excluded from the classpath of the created JVM, so that the environment resembles a real one as close as possible.
// These are either classes that will be added as attachments to the node (i.e. samples, finance, opengamma etc.) or irrelevant testing libraries (test, corda-mock etc.).
// TODO: There is pending work to fix this issue without custom blacklisting. See: https://r3-cev.atlassian.net/browse/CORDA-2164.
@ -789,7 +835,7 @@ class DriverDSLImpl(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = arguments,
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
extraJvmArguments = extraJvmArguments + bytemanJvmArgs,
workingDirectory = config.corda.baseDirectory,
maximumHeapSize = maximumHeapSize,
classPath = cp
@ -952,6 +998,11 @@ interface InternalDriverDSL : DriverDSL {
fun start()
fun shutdown()
fun startNode(
parameters: NodeParameters = NodeParameters(),
bytemanPort: Int? = null
): CordaFuture<NodeHandle>
}
/**

View File

@ -1,5 +1,6 @@
package net.corda.testing.node.internal
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.PLATFORM_VERSION
@ -268,6 +269,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
private inner class InMemoryDeduplicationHandler(override val receivedMessage: ReceivedMessage, val transfer: InMemoryMessagingNetwork.MessageTransfer) : DeduplicationHandler, ExternalEvent.ExternalMessageEvent {
override val externalCause: ExternalEvent
get() = this
override val flowId: StateMachineRunId = StateMachineRunId.createRandom()
override val deduplicationHandler: DeduplicationHandler
get() = this

View File

@ -135,6 +135,7 @@ class NodeController(check: atRuntime = ::checkExists) : Controller() {
}
}
@Suppress("MagicNumber") // initialising to max value
private fun makeNetworkParametersCopier(config: NodeConfigWrapper): NetworkParametersCopier {
val identity = getNotaryIdentity(config)
val parametersCopier = NetworkParametersCopier(NetworkParameters(

View File

@ -241,6 +241,7 @@ class NodeTabView : Fragment() {
CityDatabase.cityMap.values.map { it.countryCode }.toSet().map { it to Image(resources["/net/corda/demobench/flags/$it.png"]) }.toMap()
}
@Suppress("MagicNumber") // demobench UI magic
private fun Pane.nearestCityField(): ComboBox<WorldMapLocation> {
return combobox(model.nearestCity, CityDatabase.cityMap.values.toList().sortedBy { it.description }) {
minWidth = textWidth

View File

@ -70,6 +70,7 @@ fun main(args: Array<String>) {
}
}
@Suppress("MagicNumber") // test constants
private fun runLoadTest(loadTestConfiguration: LoadTestConfiguration) {
runLoadTests(loadTestConfiguration, listOf(
selfIssueTest to LoadTest.RunParameters(
@ -131,6 +132,7 @@ private fun runLoadTest(loadTestConfiguration: LoadTestConfiguration) {
))
}
@Suppress("MagicNumber") // test constants
private fun runStabilityTest(loadTestConfiguration: LoadTestConfiguration) {
runLoadTests(loadTestConfiguration, listOf(
// Self issue cash. This is a pre test step to make sure vault have enough cash to work with.

View File

@ -38,6 +38,7 @@ interface Volume {
nodeInfoFile.readBytes().deserialize<SignedNodeInfo>().verified().let { NotaryInfo(it.legalIdentities.first(), validating) }
}
@Suppress("MagicNumber") // default config constants
return notaryInfos.let {
NetworkParameters(
minimumPlatformVersion = 1,