mirror of
synced 2025-02-05 10:39:13 +00:00
Merge branch 'release/os/4.4' of github.com:corda/corda into new_checkpoint_schema
This commit is contained in:
@ -1,5 +1,5 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
@ -1,4 +1,4 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
Normal file
Normal file
@ -0,0 +1,6 @@
@Library('corda-shared-build-pipeline-steps') _
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
onDemandTestPipeline('k8s', '.ci/dev/on-demand-tests/commentMappings.yml')
Normal file
Normal file
@ -0,0 +1,4 @@
integration: { allParallelIntegrationTest }
pr-merge: { parallelRegressionTest }
smoke: { allParallelSmokeTest }
unit: { allParallelUnitTest }
@ -1,4 +1,4 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
@ -65,22 +65,13 @@ pipeline {
* copied to avoid collisions between files where the same test
* classes have run on multiple pods.
sh label: 'Compact test results',
shopt -s globstar
rm -rf allure-input
mkdir allure-input
for i in **/test-results-xml/**/test-runs/test-reports/**
[ -f $i ] &&
cp $i allure-input/$(echo $i | sed -e \\
echo "Finished compacting JUnit results"
includes: '**/test-results-xml/**/test-runs/test-reports/**',
targetLocation: 'allure-input',
flattenFiles: true,
renameFiles: true,
sourceCaptureExpression: '.*test-results-xml/.*-([\\d]+)/.*/([^/]+)$',
targetNameExpression: '$1-$2')])
allure includeProperties: false,
jdk: '',
results: [[path: '**/allure-input']]
@ -92,6 +83,40 @@ pipeline {
// We want to send a summary email, but want to limit to once per day.
// Comparing the dates of the previous and current builds achieves this,
// i.e. we will only send an email for the first build on a given day.
def prevBuildDate = new Date(
currentBuild?.previousBuild.timeInMillis ?: 0).clearTime()
def currentBuildDate = new Date(
if (prevBuildDate != currentBuildDate) {
def statusSymbol = '\u2753'
switch(currentBuild.result) {
case 'SUCCESS':
statusSymbol = '\u2705'
case 'UNSTABLE':
case 'FAILURE':
statusSymbol = '\u274c'
echo('First build for this date, sending summary email')
emailext to: '$DEFAULT_RECIPIENTS',
subject: "$statusSymbol" + '$BRANCH_NAME regression tests - $BUILD_STATUS',
mimeType: 'text/html',
body: '${SCRIPT, template="groovy-html.template"}'
} else {
echo('Already sent summary email today, suppressing')
cleanup {
deleteDir() /* clean up our workspace */
@ -1,4 +1,4 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
@ -1,5 +1,5 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
@ -1,5 +1,5 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
@ -71,4 +71,4 @@ pipeline {
deleteDir() /* clean up our workspace */
@ -30,7 +30,7 @@ snakeYamlVersion=1.19
@ -1,11 +1,62 @@
package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.NamedByHash
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.*
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.trace
* In the words of Matt working code is more important then pretty code. This class that contains code that may
* be serialized. If it were always serialized then the local disk fetch would need to serialize then de-serialize
* which wastes time. However over the wire we get batch fetch items serialized. This is because we need to get the exact
* length of the objects to pack them into the 10MB max message size buffer. We do not want to serialize them multiple times
* so it's a lot more efficient to send the byte stream.
class MaybeSerializedSignedTransaction(override val id: SecureHash, val serialized: SerializedBytes<SignedTransaction>?,
val nonSerialised: SignedTransaction?) : NamedByHash {
init {
check(serialized == null || nonSerialised == null) {
"MaybeSerializedSignedTransaction: Serialized and non-serialized may not both be non-null."
fun get(): SignedTransaction? {
return if (nonSerialised != null) {
} else if (serialized != null) {
val tranBytes = SerializedBytes<SignedTransaction>(serialized.bytes)
} else {
fun isNull(): Boolean {
return serialized == null && nonSerialised == null
fun serializedByteCount(): Int {
return serialized?.bytes?.size ?: 0
fun payloadContentDescription(): String {
val tranSize = serializedByteCount()
val isSer = serialized != null
val isObj = nonSerialised != null
return if (isNull()) {
} else "size = $tranSize, serialized = $isSer, isObj = $isObj"
* The [SendTransactionFlow] should be used to send a transaction to another peer that wishes to verify that transaction's
@ -40,6 +91,11 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
override fun call(): Void? {
val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize
val maxPayloadSize = networkMaxMessageSize / 2
logger.trace { "DataVendingFlow: Call: Network max message size = $networkMaxMessageSize, Max Payload Size = $maxPayloadSize" }
// The first payload will be the transaction data, subsequent payload will be the transaction/attachment/network parameters data.
var payload = payload
@ -64,20 +120,33 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
// This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need
// to resolve the transaction, a [FetchDataFlow.EndRequest] will be sent from the `otherSideSession` to indicate end of
// data request.
var loopCount = 0
while (true) {
val loopCnt = loopCount++
logger.trace { "DataVendingFlow: Main While [$loopCnt]..." }
val dataRequest = sendPayloadAndReceiveDataRequest(otherSideSession, payload).unwrap { request ->
logger.trace { "sendPayloadAndReceiveDataRequest(): ${request.javaClass.name}" }
when (request) {
is FetchDataFlow.Request.Data -> {
// Security TODO: Check for abnormally large or malformed data requests
FetchDataFlow.Request.End -> return null
FetchDataFlow.Request.End -> {
logger.trace { "DataVendingFlow: END" }
return null
logger.trace { "Sending data (Type = ${dataRequest.dataType.name})" }
var totalByteCount = 0
var firstItem = true
var batchFetchCountExceeded = false
var numSent = 0
payload = when (dataRequest.dataType) {
FetchDataFlow.DataType.TRANSACTION -> dataRequest.hashes.map { txId ->
logger.trace { "Sending: TRANSACTION (dataRequest.hashes.size=${dataRequest.hashes.size})" }
if (!authorisedTransactions.isAuthorised(txId)) {
throw FetchDataFlow.IllegalTransactionRequest(txId)
@ -85,17 +154,71 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
?: throw FetchDataFlow.HashNotFound(txId)
totalByteCount += tx.txBits.size
// Loop on all items returned using dataRequest.hashes.map:
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
if (!authorisedTransactions.isAuthorised(txId)) {
throw FetchDataFlow.IllegalTransactionRequest(txId)
// Maybe we should not just throw here as it's not recoverable on the client side. Might be better to send a reason code or
// remove the restriction on sending once.
logger.trace { "Transaction authorised OK: '$txId'" }
var serialized: SerializedBytes<SignedTransaction>? = null
if (!batchFetchCountExceeded) {
// Only fetch and serialize if we have not already exceeded the maximum byte count. Once we have, no more fetching
// is required, just reject all additional items.
val tx = serviceHub.validatedTransactions.getTransaction(txId)
?: throw FetchDataFlow.HashNotFound(txId)
logger.trace { "Transaction get OK: '$txId'" }
serialized = tx.serialize()
val itemByteCount = serialized.size
logger.trace { "Batch-Send '$txId': first = $firstItem, Total bytes = $totalByteCount, Item byte count = $itemByteCount, Maximum = $maxPayloadSize" }
if (firstItem || (totalByteCount + itemByteCount) < maxPayloadSize) {
totalByteCount += itemByteCount
// Always include at least one item else if the max is set too low nothing will ever get returned.
// Splitting items will be a separate Jira if need be
logger.trace { "Adding item to return set: '$txId'" }
} else {
logger.trace { "Fetch block size EXCEEDED at '$txId'." }
batchFetchCountExceeded = true
} // end
if (batchFetchCountExceeded) {
logger.trace { "Excluding '$txId' from return set due to exceeded count." }
// Send null if limit is exceeded
val maybeserialized = MaybeSerializedSignedTransaction(txId, if (batchFetchCountExceeded) {
} else {
}, null)
firstItem = false
} // Batch response loop end
FetchDataFlow.DataType.ATTACHMENT -> dataRequest.hashes.map {
logger.trace { "Sending: Attachments for '$it'" }
?: throw FetchDataFlow.HashNotFound(it)
FetchDataFlow.DataType.PARAMETERS -> dataRequest.hashes.map {
logger.trace { "Sending: Parameters for '$it'" }
(serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(it)
?: throw FetchDataFlow.MissingNetworkParameters(it)
FetchDataFlow.DataType.UNKNOWN -> dataRequest.hashes.map {
logger.warn("Message from from a future version of Corda with UNKNOWN enum value for FetchDataFlow.DataType: ID='$it'")
logger.trace { "Block total size = $totalByteCount: Num Items = ($numSent of ${dataRequest.hashes.size} total)" }
@ -8,10 +8,13 @@ import net.corda.core.crypto.sha256
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.MaybeSerializedSignedTransaction
import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch
import net.corda.core.internal.FetchDataFlow.HashNotFound
import net.corda.core.node.NetworkParameters
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.CordaSerializationTransformEnumDefault
import net.corda.core.serialization.CordaSerializationTransformEnumDefaults
import net.corda.core.serialization.SerializationToken
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializeAsTokenContext
@ -20,6 +23,7 @@ import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.trace
import java.nio.file.FileAlreadyExistsException
import java.util.*
@ -39,6 +43,7 @@ import java.util.*
* @param T The ultimate type of the data being fetched.
* @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type.
sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
protected val requests: Set<SecureHash>,
protected val otherSideSession: FlowSession,
@ -54,7 +59,8 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
class MissingNetworkParameters(val requested: SecureHash) : FlowException("Failed to fetch network parameters with hash: $requested")
class IllegalTransactionRequest(val requested: SecureHash) : FlowException("Illegal attempt to request a transaction (${requested}) that is not in the transitive dependency graph of the sent transaction.")
class IllegalTransactionRequest(val requested: SecureHash) : FlowException("Illegal attempt to request a transaction ($requested)"
+ " that is not in the transitive dependency graph of the sent transaction.")
data class Result<out T : NamedByHash>(val fromDisk: List<T>, val downloaded: List<T>)
@ -65,9 +71,18 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
object End : Request()
// https://docs.corda.net/serialization-enum-evolution.html
// Below annotations added to map two new enum values (BATCH_TRANSACTION and UNKNOWN) onto TRANSACTION. The effect of this is that
// if a that does not have these enum values receives it will not throw an error during deserialization. The purpose of adding
// UNKNOWN is such that future additions can default to UNKNOWN rather than an existing value. In this instance we are protecting
// against not having unknown by using the platform version as a guard.
CordaSerializationTransformEnumDefault("BATCH_TRANSACTION", "TRANSACTION"),
CordaSerializationTransformEnumDefault("UNKNOWN", "TRANSACTION")
enum class DataType {
@ -77,9 +92,11 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
val (fromDisk, toFetch) = loadWhatWeHave()
return if (toFetch.isEmpty()) {
logger.trace { "FetchDataFlow.call(): loadWhatWeHave(): From disk size = ${fromDisk.size}: No items to fetch." }
val loadedFromDisk = loadExpected(fromDisk)
Result(loadedFromDisk, emptyList())
} else {
logger.trace { "FetchDataFlow.call(): loadWhatWeHave(): From disk size = ${fromDisk.size}, To-fetch size = ${toFetch.size}" }
logger.debug { "Requesting ${toFetch.size} dependency(s) for verification from ${otherSideSession.counterparty.name}" }
// TODO: Support "large message" response streaming so response sizes are not limited by RAM.
@ -89,17 +106,27 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
// configured Artemis to not fragment messages up to 10mb so we can send 10mb messages without problems.
// Above that, we start losing authentication data on the message fragments and take exceptions in the
// network layer.
val maybeItems = ArrayList<W>(toFetch.size)
for (hash in toFetch) {
val maybeItems = ArrayList<W>()
if (toFetch.size == 1) {
val hash = toFetch.single()
// We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse.
// The only thing checked is the object type. It is a protocol violation to send results out of order.
// The only thing checked is the object type.
// TODO We need to page here after large messages will work.
logger.trace { "[Single fetch]: otherSideSession.sendAndReceive($hash): Fetch type: ${dataType.name}" }
// should only pass single item dataType below.
maybeItems += otherSideSession.sendAndReceive<List<W>>(Request.Data(NonEmptySet.of(hash), dataType)).unwrap { it }
} else {
logger.trace { "[Batch fetch]: otherSideSession.sendAndReceive(set of ${toFetch.size}): Fetch type: ${dataType.name})" }
maybeItems += otherSideSession.sendAndReceive<List<W>>(Request.Data(NonEmptySet.copyOf(toFetch), dataType))
.unwrap { it }
logger.trace { "[Batch fetch]: otherSideSession.sendAndReceive Done: count= ${maybeItems.size})" }
// Check for a buggy/malicious peer answering with something that we didn't ask for.
val downloaded = validateFetchResponse(UntrustworthyData(maybeItems), toFetch)
logger.debug { "Fetched ${downloaded.size} elements from ${otherSideSession.counterparty.name}" }
logger.trace { "Fetched ${downloaded.size} elements from ${otherSideSession.counterparty.name}, maybeItems.size = ${maybeItems.size}" }
// Re-load items already present before the download procedure. This ensures these objects are not unnecessarily checkpointed.
val loadedFromDisk = loadExpected(fromDisk)
Result(loadedFromDisk, downloaded)
@ -110,9 +137,9 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
// Do nothing by default.
private fun loadWhatWeHave(): Pair<List<SecureHash>, List<SecureHash>> {
private fun loadWhatWeHave(): Pair<List<SecureHash>, Set<SecureHash>> {
val fromDisk = ArrayList<SecureHash>()
val toFetch = ArrayList<SecureHash>()
val toFetch = LinkedHashSet<SecureHash>()
for (txid in requests) {
val stx = load(txid)
if (stx == null)
@ -137,18 +164,52 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
protected open fun convert(wire: W): T = uncheckedCast(wire)
private fun validateFetchResponse(maybeItems: UntrustworthyData<ArrayList<W>>,
requests: List<SecureHash>): List<T> {
requests: Set<SecureHash>): List<T> {
return maybeItems.unwrap { response ->
if (response.size != requests.size)
throw DownloadedVsRequestedSizeMismatch(requests.size, response.size)
logger.trace { "validateFetchResponse(): Response size = ${response.size}, Request size = ${requests.size}" }
if (response.size != requests.size) {
logger.trace { "maybeItems.unwrap: RespType Response.size (${requests.size}) != requests.size (${response.size})" }
throw FetchDataFlow.DownloadedVsRequestedSizeMismatch(requests.size, response.size)
if (logger.isTraceEnabled()) {
logger.trace { "Request size = ${requests.size}" }
for ((reqInd, req) in requests.withIndex()) {
logger.trace { "Requested[$reqInd] = '$req'" }
val answers = response.map { convert(it) }
if (logger.isTraceEnabled()) {
logger.trace { "Answers size = ${answers.size}" }
for ((respInd, item) in answers.withIndex()) {
if (item is MaybeSerializedSignedTransaction) {
logger.trace { "ValidateItem[$respInd]: '${item.id}': Type = MaybeSerializedSignedTransaction: ${item.payloadContentDescription()}" }
} else {
logger.trace("ValidateItem[$respInd]: Type = ${item.javaClass.name}")
// Check transactions actually hash to what we requested, if this fails the remote node
// is a malicious flow violator or buggy.
for ((index, item) in answers.withIndex()) {
if (item.id != requests[index])
throw DownloadedVsRequestedDataMismatch(requests[index], item.id)
var badDataIndex = -1
var badDataId: SecureHash? = null
for ((index, item) in requests.withIndex()) {
if (item != answers[index].id) {
badDataIndex = index
badDataId = item
logger.info("Will Throw on DownloadedVsRequestedDataMismatch(Req item = '$item', Resp item = '${answers[index].id}'")
if (badDataIndex >= 0 && badDataId != null) {
logger.error("Throwing DownloadedVsRequestedDataMismatch due to bad verification on: ID = $badDataId, Answer[$badDataIndex]='${answers[badDataIndex].id}'")
throw DownloadedVsRequestedDataMismatch(badDataId, answers[badDataIndex].id)
@ -212,13 +273,27 @@ class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide, DataType.BATCH_TRANSACTION) {
override fun load(txid: SecureHash): MaybeSerializedSignedTransaction? {
val tran = serviceHub.validatedTransactions.getTransaction(txid)
return if (tran == null) {
} else {
MaybeSerializedSignedTransaction(txid, null, tran)
* Given a set of hashes either loads from local network parameters storage or requests them from the other peer. Downloaded
* network parameters are saved to local parameters storage automatically. This flow can be used only if the minimumPlatformVersion is >= 4.
* Nodes on lower versions won't respond to this flow.
class FetchNetworkParametersFlow(requests: Set<SecureHash>,
otherSide: FlowSession) : FetchDataFlow<SignedDataWithCert<NetworkParameters>, SignedDataWithCert<NetworkParameters>>(requests, otherSide, DataType.PARAMETERS) {
otherSide: FlowSession) : FetchDataFlow<SignedDataWithCert<NetworkParameters>,
SignedDataWithCert<NetworkParameters>>(requests, otherSide, DataType.PARAMETERS) {
override fun load(txid: SecureHash): SignedDataWithCert<NetworkParameters>? {
return (serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(txid)
@ -235,4 +310,4 @@ class FetchNetworkParametersFlow(requests: Set<SecureHash>,
@ -9,6 +9,8 @@ import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
* Resolves transactions for the specified [txHashes] along with their full history (dependency graph) from [otherSide].
@ -36,15 +38,19 @@ class ResolveTransactionsFlow private constructor(
private var fetchNetParamsFromCounterpart = false
override fun call() {
// TODO This error should actually cause the flow to be sent to the flow hospital to be retried
val counterpartyPlatformVersion = checkNotNull(serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion) {
"Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache"
// Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters
// request to node V3 that doesn't know about this protocol.
fetchNetParamsFromCounterpart = counterpartyPlatformVersion >= 4
val batchMode = counterpartyPlatformVersion >= 6
logger.debug { "ResolveTransactionsFlow.call(): Otherside Platform Version = '$counterpartyPlatformVersion': Batch mode = $batchMode" }
if (initialTx != null) {
@ -52,8 +58,9 @@ class ResolveTransactionsFlow private constructor(
val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this)
logger.trace { "ResolveTransactionsFlow: Sending END." }
otherSide.send(FetchDataFlow.Request.End) // Finish fetching data.
// If transaction resolution is performed for a transaction where some states are relevant, then those should be
@ -19,7 +19,7 @@ interface ServiceHubCoreInternal : ServiceHub {
interface TransactionsResolver {
fun downloadDependencies()
fun downloadDependencies(batchMode: Boolean)
fun recordDependencies(usedStatesToRecord: StatesToRecord)
@ -173,6 +173,7 @@
<ID>ComplexMethod:NodeNamedCache.kt$DefaultNamedCacheFactory$open protected fun <K, V> configuredForNamed(caffeine: Caffeine<K, V>, name: String): Caffeine<K, V></ID>
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$@Throws(VaultQueryException::class) private fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging_: PageSpecification, sorting: Sort, contractStateType: Class<out T>, skipPagingChecks: Boolean): Vault.Page<T></ID>
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean): List<Vault.Update<ContractState>></ID>
<ID>ComplexMethod:NodeVaultService.kt$NodeVaultService$private fun processAndNotify(updates: List<Vault.Update<ContractState>>)</ID>
<ID>ComplexMethod:ObjectDiffer.kt$ObjectDiffer$fun diff(a: Any?, b: Any?): DiffTree?</ID>
<ID>ComplexMethod:Obligation.kt$Obligation$override fun verify(tx: LedgerTransaction)</ID>
<ID>ComplexMethod:QuasarInstrumentationHook.kt$QuasarInstrumentationHookAgent.Companion$@JvmStatic fun premain(argumentsString: String?, instrumentation: Instrumentation)</ID>
@ -196,6 +197,7 @@
<ID>ComplexMethod:TlsDiffAlgorithmsTest.kt$TlsDiffAlgorithmsTest$@Test fun testClientServerTlsExchange()</ID>
<ID>ComplexMethod:TlsDiffProtocolsTest.kt$TlsDiffProtocolsTest$@Test fun testClientServerTlsExchange()</ID>
<ID>ComplexMethod:TransactionUtils.kt$ fun createComponentGroups(inputs: List<StateRef>, outputs: List<TransactionState<ContractState>>, commands: List<Command<*>>, attachments: List<SecureHash>, notary: Party?, timeWindow: TimeWindow?, references: List<StateRef>, networkParametersHash: SecureHash?): List<ComponentGroup></ID>
<ID>ComplexMethod:TransitionExecutorImpl.kt$TransitionExecutorImpl$@Suppress("NestedBlockDepth", "ReturnCount") @Suspendable override fun executeTransition( fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor ): Pair<FlowContinuation, StateMachineState></ID>
<ID>ComplexMethod:TypeModellingFingerPrinter.kt$FingerPrintingState$// For a type we haven't seen before, determine the correct path depending on the type of type it is. private fun fingerprintNewType(type: LocalTypeInformation)</ID>
<ID>ComplexMethod:UniversalContract.kt$UniversalContract$override fun verify(tx: LedgerTransaction)</ID>
<ID>ComplexMethod:Util.kt$fun <T> debugCompare(perLeft: Perceivable<T>, perRight: Perceivable<T>)</ID>
@ -1906,7 +1908,6 @@
<ID>MaxLineLength:CordaRPCOps.kt$sorting: Sort = Sort(emptySet())</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$assertThatCode { rpc.startFlow(::SoftLock, cash.ref, Duration.ofSeconds(1)).returnValue.getOrThrow() }.doesNotThrowAnyException()</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$val cash = rpc.startFlow(::CashIssueFlow, 10.DOLLARS, issuerRef, notary).returnValue.getOrThrow().stx.tx.outRefsOfType<Cash.State>().single()</ID>
<ID>MaxLineLength:CordaSSHAuthInfo.kt$CordaSSHAuthInfo : AuthInfo</ID>
<ID>MaxLineLength:CordaServiceTest.kt$CordaServiceTest$mockNet = MockNetwork(MockNetworkParameters(threadPerNode = true, cordappsForAllNodes = listOf(FINANCE_CONTRACTS_CORDAPP, enclosedCordapp())))</ID>
<ID>MaxLineLength:CordaServiceTest.kt$CordaServiceTest.CordaServiceThatRequiresThreadContextClassLoader$assertNotNull(Thread.currentThread().contextClassLoader, "thread context classloader should not be null during service initialisation")</ID>
<ID>MaxLineLength:CordaUtils.kt$ private fun owns(packageName: String, fullClassName: String): Boolean</ID>
@ -3889,6 +3890,7 @@
<ID>SpreadOperator:WithContracts.kt$WithContracts$(owner, magicNumber, *others)</ID>
<ID>ThrowsCount:AMQPTypeIdentifierParser.kt$AMQPTypeIdentifierParser$// Make sure our inputs aren't designed to blow things up. private fun validate(typeString: String)</ID>
<ID>ThrowsCount:AbstractNode.kt$AbstractNode$private fun installCordaServices()</ID>
<ID>ThrowsCount:AbstractNode.kt$fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name)</ID>
<ID>ThrowsCount:ArtemisMessagingServer.kt$ArtemisMessagingServer$// TODO: Maybe wrap [IOException] on a key store load error so that it's clearly splitting key store loading from // Artemis IO errors @Throws(IOException::class, AddressBindingException::class, KeyStoreException::class) private fun configureAndStartServer()</ID>
<ID>ThrowsCount:BrokerJaasLoginModule.kt$BaseBrokerJaasLoginModule$@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate protected fun getUsernamePasswordAndCerts(): Triple<String, String, Array<javax.security.cert.X509Certificate>?></ID>
@ -3975,6 +3977,7 @@
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$exception: Throwable</ID>
<ID>TooGenericExceptionCaught:DriverTests.kt$DriverTests$e: Exception</ID>
<ID>TooGenericExceptionCaught:ErrorCodeLoggingTests.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:ErrorHandling.kt$ErrorHandling.CheckpointAfterErrorFlow$t: Throwable</ID>
<ID>TooGenericExceptionCaught:EventProcessor.kt$EventProcessor$ex: Exception</ID>
<ID>TooGenericExceptionCaught:Eventually.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:Expect.kt$exception: Exception</ID>
@ -4109,6 +4112,7 @@
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from $issuer, " + "however they only have $issuerQuantity!" )</ID>
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however there is no cash from $issuer!" )</ID>
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however they only have $senderQuantity!" )</ID>
<ID>TooGenericExceptionThrown:DbListenerService.kt$DbListenerService$throw Exception("Mother of all exceptions")</ID>
<ID>TooGenericExceptionThrown:FlowAsyncOperationTests.kt$FlowAsyncOperationTests.ErroredExecute$throw Exception()</ID>
<ID>TooGenericExceptionThrown:FlowFrameworkTests.kt$FlowFrameworkTests$throw Exception("Error")</ID>
<ID>TooGenericExceptionThrown:Generator.kt$Generator$throw Exception("Failed to generate", error)</ID>
@ -4406,9 +4410,6 @@
<ID>WildcardImport:CertificateRevocationListNodeTests.kt$import net.corda.nodeapi.internal.crypto.*</ID>
<ID>WildcardImport:CertificateRevocationListNodeTests.kt$import org.bouncycastle.asn1.x509.*</ID>
<ID>WildcardImport:CertificatesUtils.kt$import net.corda.nodeapi.internal.crypto.*</ID>
<ID>WildcardImport:CheckpointDumperImpl.kt$import com.fasterxml.jackson.databind.*</ID>
<ID>WildcardImport:CheckpointDumperImpl.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:CheckpointDumperImpl.kt$import net.corda.node.services.statemachine.*</ID>
<ID>WildcardImport:CheckpointSerializationAPI.kt$import net.corda.core.serialization.*</ID>
<ID>WildcardImport:ClassCarpenter.kt$import org.objectweb.asm.Opcodes.*</ID>
<ID>WildcardImport:ClassCarpenterTestUtils.kt$import net.corda.serialization.internal.amqp.*</ID>
@ -4561,7 +4562,6 @@
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowFrameworkTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowFrameworkTripartyTests.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:FlowLogic.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:FlowLogicRefFactoryImpl.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:FlowMatchers.kt$import net.corda.testing.internal.matchers.*</ID>
<ID>WildcardImport:FlowOverrideTests.kt$import net.corda.core.flows.*</ID>
@ -172,7 +172,7 @@ style:
ignoreNamedArgument: true
ignoreEnums: true
active: true
active: false
excludes: "**/buildSrc/**"
maxLineLength: 140
excludePackageStatements: true
@ -8,7 +8,8 @@ API: Service Classes
Service classes are long-lived instances that can trigger or be triggered by flows from within a node. A Service class is limited to a
single instance per node. During startup, the node handles the creation of the service.
single instance per node. During startup, the node handles the creation of the service. If there is problem when instantiating service
the node will report in the log what the problem was and terminate.
Services allow related, reusable, functions to be separated into their own class where their functionality is
grouped together. These functions can then be called from other services or flows.
@ -105,6 +105,10 @@ Unreleased
* SSH server in the :doc:`shell` has been updated to remove outdated weak ciphers and algorithms.
* Removed support for external SSH connections to the standalone shell. As a result, ``--sshd-port`` and ``--sshd-hostkey-directory``
options, as well as ``extensions.sshd`` configuration entry, have been removed from the standalone shell.
Available alternatives are either to use the standalone shell directly, or connect to the node embedded shell via SSH.
.. _changelog_v4.1:
Version 4.1
@ -314,10 +314,10 @@ extraNetworkMapKeys
.. _corda_configuration_flow_external_operation_thread_pool_size:
The number of threads available to execute external operations called from flows. See the documentation on
:ref:`calling external systems inside of flows <api_flows_external_operations>` for more information.
The number of threads available to execute external operations that have been called from flows. See the documentation on
:ref:`calling external systems inside flows <api_flows_external_operations>` for more information.
*Default:* Set to the number of available cores on the machine the node is running on
*Default:* Set to the lesser of either the maximum number of cores allocated to the node, or 10.
Duration of the period suspended flows waiting for IO are logged.
@ -49,7 +49,9 @@ updates related to issuance, sale, purchase and exit of bonds.
Writing and building apps that run on both Corda (open source) and Corda Enterprise
Corda and Corda Enterprise are compatible and interoperable, which means you can write a CorDapp that can run on both.
Corda and Corda Enterprise are moving towards an Open Core approach, which means in practice that the APIs and dependencies for CorDapps
should all be open source, and all CorDapps (whether targeting Corda open source or Corda Enterprise) can now be compiled against the Open
Source Corda core library, as Corda Enterprise itself is compiled against the Open Source core library.
To make this work in practice you should follow these steps:
1. Ensure your CorDapp is designed per :doc:`Structuring a CorDapp <writing-a-cordapp>` and annotated according to :ref:`CorDapp separation <cordapp_separation_ref>`.
@ -61,12 +63,8 @@ To make this work in practice you should follow these steps:
.. note:: It is also important to understand how to manage any dependencies a CorDapp may have on 3rd party libraries and other CorDapps.
Please read :ref:`Setting your dependencies <cordapp_dependencies_ref>` to understand the options and recommendations with regards to correctly Jar'ing CorDapp dependencies.
2. Compile this **CorDapp kernel** Jar once, and then depend on it from your workflows Jar (or Jars - see below). Importantly, if
you want your app to work on both Corda and Corda Enterprise, you must compile this Jar against Corda, not Corda Enterprise.
This is because, in future, we may add additional functionality to Corda Enterprise that is not in Corda and you may inadvertently create a
CorDapp kernel that does not work on Corda open source. Compiling against Corda open source as a matter of course prevents this risk, as well
as preventing the risk that you inadvertently create two different versions of the Jar, which will have different hashes and hence break compatibility
and interoperability.
2. Compile this **CorDapp kernel** Jar once, and then depend on it from your workflows Jar. In terms of Corda depdendencies,this should only
depend on the ``corda-core`` package from the Corda Open Source distribution.
.. note:: As of Corda 4 it is recommended to use :ref:`CorDapp Jar signing <cordapp_build_system_signing_cordapp_jar_ref>` to leverage the new signature constraints functionality.
@ -75,5 +73,9 @@ To make this work in practice you should follow these steps:
to your CorDapp for when it is run on Corda Enterprise (perhaps it uses advanced features of one of the supported enterprise databases or includes
advanced database migration scripts, or some other Enterprise-only feature).
When building a CorDapp against Corda Enterprise, please note that the ``corda-core`` library still needs to come from the open source
distribution, so you will have dependencies on Corda Enterprise and a matching open core distribution. Specifically, any CorDapp targeted
to run on Corda Enterprise should have unit and integration tests using Corda Enterprise.
In summary, structure your app as kernel (contracts, states, dependencies) and workflow (the rest) and be sure to compile the kernel
against Corda open source. You can compile your workflow (Jars) against the distribution of Corda that they target.
@ -26,12 +26,38 @@ Permissions
When accessing the shell (embedded, standalone, via SSH) RPC permissions are required. This is because the shell actually communicates
with the node using RPC calls.
There are several operations that are read-only in nature and granting them should have no impact on the ledger state of the node.
These permissions are:
.. code:: bash
There are also operations that allow starting/killing the flows or even stopping the node as a whole:
* Watching flows (``flow watch``) requires ``InvokeRpc.stateMachinesFeed``.
* Starting flows requires ``InvokeRpc.startTrackedFlowDynamic``, ``InvokeRpc.registeredFlows`` and ``InvokeRpc.wellKnownPartyFromX500Name``, as well as a
permission for the flow being started.
* Killing flows (``flow kill``) requires ``InvokeRpc.killFlow``. This currently
allows the user to kill *any* flow, so please be careful when granting it!
Description of RPC operations can be found in :doc:`api-rpc`.
The shell via the local terminal
@ -110,8 +136,7 @@ Run the following command from the terminal:
.. code:: bash
corda-shell [-hvV] [--logging-level=<loggingLevel>] [--password=<password>]
[--sshd-port=<sshdPort>] [--truststore-file=<trustStoreFile>]
[--truststore-type=<trustStoreType>] [--user=<user>] [-a=<host>]
[-c=<cordappDirectory>] [-f=<configFile>] [-o=<commandsDirectory>]
@ -126,8 +151,6 @@ Where:
* ``--port``, ``-p``: The RPC port of the Corda node.
* ``--user=<user>``: The RPC user name.
* ``--password=<password>`` The RPC user password. If not provided it will be prompted for on startup.
* ``--sshd-port=<sshdPort>`` Enables SSH server for shell.
* ``--sshd-hostkey-directory=<sshHostKeyDirectory``: The directory containing the hostkey.pem file for the SSH server.
* ``--truststore-password=<trustStorePassword>``: The password to unlock the TrustStore file.
* ``--truststore-file=<trustStoreFile>``: The path to the TrustStore file.
* ``--truststore-type=<trustStoreType>``: The type of the TrustStore (e.g. JKS).
@ -157,10 +180,6 @@ The format of ``config-file``:
cordapps {
path : /path/to/cordapps/dir
sshd {
enabled : "false"
port : 2223
ssl {
keystore {
@ -177,13 +196,7 @@ The format of ``config-file``:
user : demo
password : demo
Standalone Shell via SSH
The standalone shell can embed an SSH server which redirects interactions via RPC calls to the Corda node.
To run SSH server use ``--sshd-port`` option when starting standalone shell or ``extensions.sshd`` entry in the configuration file.
For connection to SSH refer to `Connecting to the shell`_.
Certain operations (like starting Flows) will require Shell's ``--cordpass-directory`` to be configured correctly (see `Starting the standalone shell`_).
.. note:: SSH server is not supported inside the standalone shell.
Shell Safe Mode
@ -4,6 +4,7 @@ import co.paralleluniverse.strands.Strand
import com.zaxxer.hikari.HikariDataSource
import com.zaxxer.hikari.pool.HikariPool
import com.zaxxer.hikari.util.ConcurrentBag
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
@ -100,7 +101,7 @@ class CordaPersistence(
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
customClassLoader: ClassLoader? = null,
val closeConnection: Boolean = true,
val errorHandler: (t: Throwable) -> Unit = {}
val errorHandler: DatabaseTransaction.(e: Exception) -> Unit = {}
) : Closeable {
companion object {
private val log = contextLogger()
@ -191,17 +192,17 @@ class CordaPersistence(
fun createSession(): Connection {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
val transaction = contextTransaction
try {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
return contextTransaction.connection
} catch (sqlException: SQLException) {
throw sqlException
} catch (persistenceException: PersistenceException) {
throw persistenceException
return transaction.connection
} catch (e: Exception) {
if (e is SQLException || e is PersistenceException) {
throw e
@ -230,18 +231,22 @@ class CordaPersistence(
recoverAnyNestedSQLException: Boolean, statement: DatabaseTransaction.() -> T): T {
val outer = contextTransactionOrNull
try {
return if (outer != null) {
return if (outer != null) {
// we only need to handle errors coming out of inner transactions because,
// a. whenever this code is being executed within the flow state machine, a top level transaction should have
// previously been created by the flow state machine in ActionExecutorImpl#executeCreateTransaction
// b. exceptions coming out from top level transactions are already being handled in CordaPersistence#inTopLevelTransaction
// i.e. roll back and close the transaction
try {
} else {
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
} catch (e: Exception) {
if (e is SQLException || e is PersistenceException || e is HospitalizeFlowException) {
throw e
} catch (sqlException: SQLException) {
throw sqlException
} catch (persistenceException: PersistenceException) {
throw persistenceException
} else {
inTopLevelTransaction(isolationLevel, recoverableFailureTolerance, recoverAnyNestedSQLException, statement)
@ -1,7 +1,7 @@
package net.corda.nodeapi.internal.persistence
import co.paralleluniverse.strands.Strand
import org.hibernate.BaseSessionEventListener
import net.corda.core.CordaRuntimeException
import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
@ -51,7 +51,27 @@ class DatabaseTransaction(
private var committed = false
private var closed = false
* Holds the first exception thrown from a series of statements executed in the same [DatabaseTransaction].
* The purpose of this property is to make sure this exception cannot be suppressed in user code.
* The exception will be thrown on the next [commit]. It is used only inside a flow state machine execution.
private var firstExceptionInDatabaseTransaction: Exception? = null
fun setException(e: Exception) {
if (firstExceptionInDatabaseTransaction == null) {
firstExceptionInDatabaseTransaction = e
private fun clearException() {
firstExceptionInDatabaseTransaction = null
fun commit() {
firstExceptionInDatabaseTransaction?.let {
throw DatabaseTransactionException(it)
if (sessionDelegate.isInitialized()) {
@ -66,16 +86,18 @@ class DatabaseTransaction(
if (!connection.isClosed) {
fun close() {
if (sessionDelegate.isInitialized() && session.isOpen) {
if (database.closeConnection) {
contextTransactionOrNull = outerTransaction
if (outerTransaction == null) {
synchronized(this) {
@ -99,3 +121,7 @@ class DatabaseTransaction(
* Wrapper exception, for any exception registered as [DatabaseTransaction.firstExceptionInDatabaseTransaction].
class DatabaseTransactionException(override val cause: Throwable): CordaRuntimeException(cause.message, cause)
@ -80,7 +80,7 @@ class CordaServiceIssueOnceAtStartupTests {
// Without the "secret" property service upon instantiation will be subscribed to lifecycle events which would be unwanted.
// Also do not do this for Notary
val myName = services.myInfo.legalIdentities.single().name
val notaryName = services.networkMapCache.notaryIdentities.single().name
val notaryName = services.networkMapCache.notaryIdentities.firstOrNull()?.name
if(java.lang.Boolean.getBoolean(armedPropName) && myName != notaryName) {
services.register(observer = MyServiceLifecycleObserver())
} else {
@ -63,34 +63,29 @@ class CordaServiceLifecycleFatalTests {
fun `JVM terminates on critical failure`() {
(1..20).forEach {
// Scenario terminates JVM - node should be running out of process
driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()),
notarySpecs = emptyList(),
systemProperties = mapOf(SECRET_PROPERTY_NAME to "true", tempFilePropertyName to tmpFile.absolutePath))) {
val nodeHandle = startNode(providedName = ALICE_NAME).getOrThrow()
logger.info("Rep #$it")
val rpcInterface = nodeHandle.rpc
eventually(duration = 60.seconds) {
assertEquals(readyToThrowMarker, tmpFile.readLines().last())
// Scenario terminates JVM - node should be running out of process
driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()),
notarySpecs = emptyList(),
systemProperties = mapOf(SECRET_PROPERTY_NAME to "true", tempFilePropertyName to tmpFile.absolutePath))) {
val nodeHandle = startNode(providedName = ALICE_NAME).getOrThrow()
val rpcInterface = nodeHandle.rpc
eventually(duration = 60.seconds) {
assertEquals(readyToThrowMarker, tmpFile.readLines().last())
tmpFile.appendText("\n" + goodToThrowMarker)
tmpFile.appendText("\n" + goodToThrowMarker)
// We signalled that it is good to throw which will eventually trigger node shutdown and RPC interface no longer working.
eventually(duration = 30.seconds) {
assertFailsWith(Exception::class) {
try {
} catch (ex: Exception) {
logger.info("Thrown as expected", ex)
throw ex
// We signalled that it is good to throw which will eventually trigger node shutdown and RPC interface no longer working.
eventually(duration = 30.seconds) {
assertFailsWith(Exception::class) {
try {
} catch (ex: Exception) {
logger.info("Thrown as expected", ex)
throw ex
@ -1,9 +1,13 @@
package net.corda.node.services.vault
import co.paralleluniverse.strands.concurrent.Semaphore
import com.r3.dbfailure.workflows.CreateStateFlow
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
import net.corda.core.CordaRuntimeException
import com.r3.dbfailure.workflows.DbListenerService
import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow
import com.r3.transactionfailure.workflows.ErrorHandling
import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.contextLogger
@ -13,19 +17,20 @@ import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.findCordapp
import org.junit.After
import org.junit.Assert
import org.junit.Test
import rx.exceptions.OnErrorNotImplementedException
import java.lang.IllegalStateException
import java.sql.SQLException
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import javax.persistence.PersistenceException
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class VaultObserverExceptionTest {
companion object {
@ -43,6 +48,7 @@ class VaultObserverExceptionTest {
DbListenerService.onError = null
@ -50,12 +56,11 @@ class VaultObserverExceptionTest {
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation
fun unhandledSqlExceptionFromVaultObserverGetsHospitatlised() {
fun unhandledSqlExceptionFromVaultObserverGetsHospitalised() {
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is SQLException -> {
@ -80,58 +85,17 @@ class VaultObserverExceptionTest {
* Throwing a random (non-SQL releated) exception from a vault observer causes the flow to be
* aborted when unhandled in user code
* None exception thrown from a vault observer can be suppressible in the flow that triggered the observer
* because the recording of transaction states failed. The flow will be hospitalized.
* The exception will bring the rx.Observer down.
fun otherExceptionsFromVaultObserverBringFlowDown() {
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith(CordaRuntimeException::class, "Toys out of pram") {
* A random exception from a VaultObserver will bring the Rx Observer down, but can be handled in the flow
* triggering the observer, and the flow will continue successfully (for some values of success)
fun otherExceptionsFromVaultObserverCanBeSuppressedInFlow() {
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
* If the state we are trying to persist triggers a persistence exception, the flow hospital will retry the flow
* and keep it in for observation if errors persist.
fun persistenceExceptionOnCommitGetsRetriedAndThenGetsKeptForObservation() {
var admitted = 0
fun exceptionFromVaultObserverCannotBeSuppressedInFlow() {
var observation = 0
StaffedFlowHospital.onFlowAdmitted.add {
val waitUntilHospitalised = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
@ -139,25 +103,52 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith<TimeoutException> {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow(Duration.of(30, ChronoUnit.SECONDS))
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
Assert.assertTrue("Exception from service has not been to Hospital", admitted > 0)
Assert.assertEquals(1, observation)
* If we have a state causing a database error lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws. This will be kept in for observation.
* None runtime exception thrown from a vault observer can be suppressible in the flow that triggered the observer
* because the recording of transaction states failed. The flow will be hospitalized.
* The exception will bring the rx.Observer down.
fun persistenceExceptionOnFlushGetsRetriedAndThenGetsKeptForObservation() {
fun runtimeExceptionFromVaultObserverCannotBeSuppressedInFlow() {
var observation = 0
val waitUntilHospitalised = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
Assert.assertEquals(1, observation)
* If we have a state causing a persistence exception during record transactions (in NodeVaultService#processAndNotify),
* the flow will be kept in for observation.
fun persistenceExceptionDuringRecordTransactionsGetsKeptForObservation() {
var counter = 0
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is PersistenceException -> {
log.info("Got a PersistentException in the flow hospital count = $counter")
@ -177,7 +168,6 @@ class VaultObserverExceptionTest {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith<TimeoutException>("PersistenceException") {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
@ -187,56 +177,15 @@ class VaultObserverExceptionTest {
* If we have a state causing a database error lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws.
* Trying to catch and suppress that exception in the flow around the code triggering the vault observer
* does not change the outcome - the first exception in the service will bring the service down and will
* be caught by the flow, but the state machine will error the flow anyway as Corda code threw.
* If we have a state causing a persistence exception during record transactions (in NodeVaultService#processAndNotify),
* trying to catch and suppress that exception inside the flow does protect the flow, but the new
* interceptor will fail the flow anyway. The flow will be kept in for observation.
fun persistenceExceptionOnFlushInVaultObserverCannotBeSuppressedInFlow() {
fun persistenceExceptionDuringRecordTransactionsCannotBeSuppressedInFlow() {
var counter = 0
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is PersistenceException -> {
log.info("Got a PersistentException in the flow hospital count = $counter")
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(
val flowResult = flowHandle.returnValue
assertFailsWith<TimeoutException>("PersistenceException") { flowResult.getOrThrow(30.seconds) }
Assert.assertTrue("Flow has not been to hospital", counter > 0)
* If we have a state causing a persistence exception lined up for persistence, calling jdbConnection() in
* the vault observer will trigger a flush that throws.
* Trying to catch and suppress that exception inside the service does protect the service, but the new
* interceptor will fail the flow anyway. The flow will be kept in for observation if errors persist.
fun persistenceExceptionOnFlushInVaultObserverCannotBeSuppressedInService() {
var counter = 0
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is OnErrorNotImplementedException -> Assert.fail("OnErrorNotImplementedException should be unwrapped")
is PersistenceException -> {
log.info("Got a PersistentException in the flow hospital count = $counter")
@ -253,9 +202,8 @@ class VaultObserverExceptionTest {
val flowHandle = aliceNode.rpc.startFlow(
::Initiator, "EntityManager",
val flowResult = flowHandle.returnValue
assertFailsWith<TimeoutException>("PersistenceException") { flowResult.getOrThrow(30.seconds) }
Assert.assertTrue("Flow has not been to hospital", counter > 0)
@ -310,4 +258,109 @@ class VaultObserverExceptionTest {
* Exceptions thrown from a vault observer ,are now wrapped and rethrown as a HospitalizeFlowException.
* The flow should get hospitalised and any potential following checkpoint should fail.
* In case of a SQLException or PersistenceException, this was already "breaking" the database transaction
* and therefore, the next checkpoint was failing.
fun `attempt to checkpoint, following an error thrown in vault observer which gets supressed in flow, will fail`() {
var counterBeforeFirstCheckpoint = 0
var counterAfterFirstCheckpoint = 0
var counterAfterSecondCheckpoint = 0
ErrorHandling.hookBeforeFirstCheckpoint = { counterBeforeFirstCheckpoint++ }
ErrorHandling.hookAfterFirstCheckpoint = { counterAfterFirstCheckpoint++ }
ErrorHandling.hookAfterSecondCheckpoint = { counterAfterSecondCheckpoint++ }
val waitUntilHospitalised = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
inMemoryDB = false,
startNodesInProcess = true,
isDebug = true,
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.schemas")))) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
node.rpc.startFlow(::CheckpointAfterErrorFlow, CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, // throw not persistence exception
// restart node, see if flow retries from correct checkpoint
startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
// check flow retries from correct checkpoint
assertTrue(counterBeforeFirstCheckpoint == 1)
assertTrue(counterAfterFirstCheckpoint == 2)
assertTrue(counterAfterSecondCheckpoint == 0)
fun `vault observer failing with OnErrorFailedException gets hospitalised`() {
DbListenerService.onError = {
log.info("Error in rx.Observer#OnError! - Observer will fail with OnErrorFailedException")
throw it
var observation = 0
val waitUntilHospitalised = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
Assert.assertEquals(1, observation)
fun `out of memory error halts JVM, on node restart flow retries, and succeeds`() {
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = false).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
val terminated = (aliceNode as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
if (terminated) {
// starting node within the same process this time to take advantage of threads sharing same heap space
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
CreateStateFlow.Initiator.onExitingCall = {
startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = true).getOrThrow()
} else {
throw IllegalStateException("Out of process node is still up and running!")
@ -49,10 +49,10 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.services.ContractUpgradeService
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.diagnostics.DiagnosticsService
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.node.services.diagnostics.DiagnosticsService
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
@ -61,11 +61,9 @@ import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
import net.corda.core.utilities.minutes
import net.corda.nodeapi.internal.lifecycle.NodeServicesContext
import net.corda.djvm.source.ApiSource
import net.corda.djvm.source.EmptyApi
import net.corda.djvm.source.UserSource
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
import net.corda.node.CordaClock
import net.corda.node.VersionInfo
import net.corda.node.internal.classloading.requireAnnotation
@ -85,7 +83,6 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.NodePropertiesStore
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEventsDistributor
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.VaultServiceInternal
@ -121,7 +118,6 @@ import net.corda.node.services.persistence.PublicKeyToOwningIdentityCacheImpl
import net.corda.node.services.persistence.PublicKeyToTextConverter
import net.corda.node.services.rpc.CheckpointDumperImpl
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowMonitor
@ -155,8 +151,11 @@ import net.corda.nodeapi.internal.crypto.X509Utilities.NODE_IDENTITY_KEY_ALIAS
import net.corda.nodeapi.internal.cryptoservice.CryptoServiceFactory
import net.corda.nodeapi.internal.cryptoservice.SupportedCryptoServices
import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService
import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEventsDistributor
import net.corda.nodeapi.internal.lifecycle.NodeServicesContext
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaTransactionSupportImpl
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException
@ -178,9 +177,12 @@ import java.sql.Connection
import java.time.Clock
import java.time.Duration
import java.time.format.DateTimeParseException
import java.util.Properties
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MINUTES
import java.util.concurrent.TimeUnit.SECONDS
import java.util.function.Consumer
@ -737,11 +739,17 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
private fun createExternalOperationExecutor(numberOfThreads: Int): ExecutorService {
when (numberOfThreads) {
1 -> log.info("Flow external operation executor has $numberOfThreads thread")
else -> log.info("Flow external operation executor has $numberOfThreads threads")
else -> log.info("Flow external operation executor has a max of $numberOfThreads threads")
return Executors.newFixedThreadPool(
// Start with 1 thread and scale up to the configured thread pool size if needed
// Parameters of [ThreadPoolExecutor] based on [Executors.newFixedThreadPool]
return ThreadPoolExecutor(
@ -757,7 +765,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
// This sets the Cordapp classloader on the contextClassLoader of the current thread, prior to initializing services
// Needed because of bug CORDA-2653 - some Corda services can utilise third-party libraries that require access to
// the Thread context class loader
val oldContextClassLoader: ClassLoader? = Thread.currentThread().contextClassLoader
try {
Thread.currentThread().contextClassLoader = cordappLoader.appClassLoader
@ -768,14 +775,17 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as a Corda service, must have a constructor with a single parameter of type " +
throw e
} catch (e: ServiceInstantiationException) {
if (e.cause != null) {
log.error("Corda service ${it.name} failed to instantiate. Reason was: ${e.cause?.rootMessage}", e.cause)
} else {
log.error("Corda service ${it.name} failed to instantiate", e)
throw e
} catch (e: Exception) {
log.error("Unable to install Corda service ${it.name}", e)
throw e
} finally {
@ -1154,6 +1164,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T {
return database.transaction {
@ -1247,14 +1258,18 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig,
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
return CordaPersistence(
attributeConverters, customClassLoader,
errorHandler = { t ->
attributeConverters, customClassLoader,
errorHandler = { e ->
// "corrupting" a DatabaseTransaction only inside a flow state machine execution
FlowStateMachineImpl.currentStateMachine()?.let {
// register only the very first exception thrown throughout a chain of logical transactions
fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) {
@ -167,7 +167,7 @@ internal class CordaRPCOpsImpl(
return snapshot
override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid)
override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
@ -86,12 +86,10 @@ class InitialRegistration(val baseDirectory: Path, private val networkRootTrustS
private fun initialRegistration(config: NodeConfiguration) {
// Null checks for [compatibilityZoneURL], [rootTruststorePath] and
// [rootTruststorePassword] have been done in [CmdLineOptions.loadConfig]
val result = attempt { registerWithNetwork(config) }.doOnFailure(Consumer(this::handleRegistrationError))
attempt { registerWithNetwork(config) }.doOnFailure(Consumer(this::handleRegistrationError)).getOrThrow()
if (result.isSuccess) {
// At this point the node registration was successful. We can delete the marker file.
// At this point the node registration was successful. We can delete the marker file.
private fun deleteNodeRegistrationMarker(baseDir: Path) {
@ -10,6 +10,7 @@ import net.corda.core.internal.dependencies
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.core.utilities.seconds
import net.corda.node.services.api.WritableTransactionStorage
import java.util.*
@ -19,7 +20,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
private val logger = flow.logger
override fun downloadDependencies() {
override fun downloadDependencies(batchMode: Boolean) {
logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" }
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
@ -39,10 +40,12 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
// the db contain the identities that were resolved when the transaction was first checked, or should we
// accept this kind of change is possible? Most likely solution is for identity data to be an attachment.
val nextRequests = LinkedHashSet<SecureHash>(flow.txHashes) // Keep things unique but ordered, for unit test stability.
val nextRequests = LinkedHashSet<SecureHash>(flow.txHashes) // Keep things unique but ordered, for unit test stability.
val topologicalSort = TopologicalSort()
logger.debug { "DbTransactionsResolver.downloadDependencies(batchMode=$batchMode)" }
while (nextRequests.isNotEmpty()) {
logger.debug { "Main fetch loop: size_remaining=${nextRequests.size}" }
// Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the
// graph we're traversing.
@ -76,8 +79,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
// If the flow did not suspend on the last iteration of the downloaded loop above, perform a suspend here to ensure no write
// locks are held going into the next while loop iteration.
// If the flow did not suspend on the last iteration of the downloaded loop above, perform a suspend here to ensure that
// all data is flushed to the database.
if (!suspended) {
@ -93,7 +96,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
val sortedDependencies = checkNotNull(this.sortedDependencies)
logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
logger.trace { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
for (txId in sortedDependencies) {
// Retrieve and delete the transaction from the unverified store.
@ -23,12 +23,15 @@ import net.corda.nodeapi.internal.loadDevCaTrustStore
import net.corda.nodeapi.internal.registerDevP2pCertificates
import org.slf4j.LoggerFactory
import java.nio.file.Path
import kotlin.math.min
fun configOf(vararg pairs: Pair<String, Any?>): Config = ConfigFactory.parseMap(mapOf(*pairs))
operator fun Config.plus(overrides: Map<String, Any?>): Config = ConfigFactory.parseMap(overrides).withFallback(this)
object ConfigHelper {
private const val CORDA_PROPERTY_PREFIX = "corda."
@ -47,7 +50,9 @@ object ConfigHelper {
// Detect the number of cores
val coreCount = Runtime.getRuntime().availableProcessors()
val multiThreadingConfig = configOf("flowExternalOperationThreadPoolSize" to coreCount.toString())
val multiThreadingConfig = configOf(
"flowExternalOperationThreadPoolSize" to min(coreCount, FLOW_EXTERNAL_OPERATION_THREAD_POOL_SIZE_MAX).toString()
val systemOverrides = ConfigFactory.systemProperties().cordaEntriesOnly()
val environmentOverrides = ConfigFactory.systemEnvironment().cordaEntriesOnly()
@ -102,7 +102,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
* statemachine.
val flowPatients = HashMap<StateMachineRunId, FlowMedicalHistory>()
val treatableSessionInits = HashMap<UUID, InternalSessionInitRecord>()
val treatableSessionInits = HashMap<StateMachineRunId, InternalSessionInitRecord>()
val recordsPublisher = PublishSubject.create<MedicalRecord>()
private val secureRandom = newSecureRandom()
@ -111,8 +111,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
* The node was unable to initiate the [InitialSessionMessage] from [sender].
fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) {
val id = event.flowId
val time = clock.instant()
val id = UUID.randomUUID()
val outcome = if (error is SessionRejectException.UnknownClass) {
// We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is
// installed on restart, at which point the message will be able proceed as normal. If not then it will need
@ -154,7 +154,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
* to send back the relevant session error to the initiator party and acknowledge its receipt from the message broker
* so that it never gets redelivered.
fun dropSessionInit(id: UUID): Boolean {
fun dropSessionInit(id: StateMachineRunId): Boolean {
val (sessionMessage, event, publicRecord) = mutex.locked {
treatableSessionInits.remove(id) ?: return false
@ -339,7 +339,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
override val outcome: Outcome) : MedicalRecord()
/** Medical record for a session initiation that was unsuccessful. */
data class SessionInit(val id: UUID,
data class SessionInit(val id: StateMachineRunId,
override val time: Instant,
override val outcome: Outcome,
val initiatorFlowClassName: String,
@ -6,6 +6,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseTransactionException
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.security.SecureRandom
@ -61,11 +62,23 @@ class TransitionExecutorImpl(
} else {
log.info("Error while executing $action, with event $event, erroring state", exception)
// distinguish between a DatabaseTransactionException and an actual StateTransitionException
val stateTransitionOrDatabaseTransactionException =
if (exception is DatabaseTransactionException) {
// if the exception is a DatabaseTransactionException then it is not really a StateTransitionException
// it is actually an exception that previously broke a DatabaseTransaction and was suppressed by user code
// it was rethrown on [DatabaseTransaction.commit]. Unwrap the original exception and pass it to flow hospital
} else {
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
StateTransitionException(action, event, exception)
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
listOf(FlowError(secureRandom.nextLong(), StateTransitionException(action, event, exception)))
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
isFlowResumed = false
@ -5,6 +5,7 @@ import co.paralleluniverse.strands.Strand
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.containsAny
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.node.ServicesForResolution
@ -26,11 +27,13 @@ import rx.Observable
import rx.exceptions.OnErrorNotImplementedException
import rx.subjects.PublishSubject
import java.security.PublicKey
import java.sql.SQLException
import java.time.Clock
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArraySet
import javax.persistence.PersistenceException
import javax.persistence.Tuple
import javax.persistence.criteria.CriteriaBuilder
import javax.persistence.criteria.CriteriaUpdate
@ -393,12 +396,25 @@ class NodeVaultService(
persistentStateService.persist(vaultUpdate.produced + vaultUpdate.references)
try {
} catch (e: OnErrorNotImplementedException) {
log.warn("Caught an Rx.OnErrorNotImplementedException " +
"- caused by an exception in an RX observer that was unhandled " +
"- the observer has been unsubscribed! The underlying exception will be rethrown.", e)
// if the observer code threw, unwrap their exception from the RX wrapper
throw e.cause ?: e
} catch (e: Exception) {
// exception thrown here will cause the recording of transaction states to the vault being rolled back
// it could cause the ledger go into an inconsistent state, therefore we should hospitalise this flow
// observer code should either be fixed or ignored and have the flow retry from previous checkpoint
"Failed to record transaction states locally " +
"- the node could be now in an inconsistent state with other peers and/or the notary " +
"- hospitalising the flow ", e
throw (e as? OnErrorNotImplementedException)?.let {
it.cause?.let { wrapped ->
if (wrapped is SQLException || wrapped is PersistenceException) {
} else {
} ?: HospitalizeFlowException(e)
@ -1,4 +1,4 @@
package net.corda.verification
package net.corda.verification.contracts
import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.CommandData
@ -1,4 +1,4 @@
package net.corda.verification
package net.corda.verification.contracts
import net.corda.core.contracts.CommandData
import net.corda.core.contracts.Contract
@ -1,4 +1,4 @@
package net.corda.verification
package net.corda.verification.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
@ -7,6 +7,9 @@ import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.unwrap
import net.corda.verification.contracts.CommsTestCommand
import net.corda.verification.contracts.CommsTestContract
import net.corda.verification.contracts.CommsTestState
@ -1,4 +1,4 @@
package net.corda.verification
package net.corda.verification.flows
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
@ -6,6 +6,9 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import co.paralleluniverse.fibers.Suspendable
import net.corda.verification.contracts.NotaryTestCommand
import net.corda.verification.contracts.NotaryTestContract
import net.corda.verification.contracts.NotaryTestState
class TestNotaryFlow : FlowLogic<String>() {
@ -4,7 +4,7 @@ import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.TestIdentity
import net.corda.verification.TestCommsFlowInitiator
import net.corda.verification.flows.TestCommsFlowInitiator
import org.junit.Assert
import org.junit.Test
@ -0,0 +1,9 @@
package net.corda.serialization.djvm.deserializers
import java.util.function.Predicate
class CheckEnum : Predicate<Class<*>> {
override fun test(clazz: Class<*>): Boolean {
return clazz.isEnum
@ -52,11 +52,14 @@ import java.math.BigInteger
import java.util.Date
import java.util.UUID
import java.util.function.Function
import java.util.function.Predicate
class SandboxSerializationSchemeBuilder(
private val classLoader: SandboxClassLoader,
private val sandboxBasicInput: Function<in Any?, out Any?>,
private val rawTaskFactory: Function<in Any, out Function<in Any?, out Any?>>,
private val taskFactory: Function<Class<out Function<*, *>>, out Function<in Any?, out Any?>>,
private val predicateFactory: Function<Class<out Predicate<*>>, out Predicate<in Any?>>,
private val customSerializerClassNames: Set<String>,
private val serializationWhitelistNames: Set<String>,
private val serializerFactoryFactory: SerializerFactoryFactory
@ -66,7 +69,6 @@ class SandboxSerializationSchemeBuilder(
private fun getSerializerFactory(context: SerializationContext): SerializerFactory {
val taskFactory = rawTaskFactory.compose(classLoader.createSandboxFunction())
return serializerFactoryFactory.make(context).apply {
register(SandboxBitSetSerializer(classLoader, taskFactory, this))
register(SandboxCertPathSerializer(classLoader, taskFactory, this))
@ -100,7 +102,7 @@ class SandboxSerializationSchemeBuilder(
register(SandboxCharacterSerializer(classLoader, sandboxBasicInput))
register(SandboxCollectionSerializer(classLoader, taskFactory, this))
register(SandboxMapSerializer(classLoader, taskFactory, this))
register(SandboxEnumSerializer(classLoader, taskFactory, this))
register(SandboxEnumSerializer(classLoader, taskFactory, predicateFactory, this))
register(SandboxPublicKeySerializer(classLoader, taskFactory))
register(SandboxToStringSerializer(BigDecimal::class.java, classLoader, rawTaskFactory, sandboxBasicInput))
register(SandboxToStringSerializer(BigInteger::class.java, classLoader, rawTaskFactory, sandboxBasicInput))
@ -14,9 +14,11 @@ import net.corda.serialization.internal.amqp.SerializerFactory
import net.corda.serialization.internal.amqp.SerializerFactoryFactory
import net.corda.serialization.internal.amqp.WhitelistBasedTypeModelConfiguration
import net.corda.serialization.internal.amqp.createClassCarpenter
import net.corda.serialization.internal.model.BaseLocalTypes
import net.corda.serialization.internal.model.ClassCarpentingTypeLoader
import net.corda.serialization.internal.model.ConfigurableLocalTypeModel
import net.corda.serialization.internal.model.SchemaBuildingRemoteTypeCarpenter
import net.corda.serialization.internal.amqp.SerializerFactoryBuilder
import net.corda.serialization.internal.model.TypeLoader
import net.corda.serialization.internal.model.TypeModellingFingerPrinter
import java.lang.Boolean
@ -36,7 +38,8 @@ import java.util.function.Predicate
* This has all been lovingly copied from [SerializerFactoryBuilder].
class SandboxSerializerFactoryFactory(
private val primitiveSerializerFactory: Function<Class<*>, AMQPSerializer<Any>>
private val primitiveSerializerFactory: Function<Class<*>, AMQPSerializer<Any>>,
private val localTypes: BaseLocalTypes
) : SerializerFactoryFactory {
override fun make(context: SerializationContext): SerializerFactory {
@ -65,7 +68,11 @@ class SandboxSerializerFactoryFactory(
val localTypeModel = ConfigurableLocalTypeModel(
WhitelistBasedTypeModelConfiguration(context.whitelist, customSerializerRegistry)
whitelist = context.whitelist,
customSerializerRegistry = customSerializerRegistry,
baseTypes = localTypes
val fingerPrinter = TypeModellingFingerPrinter(customSerializerRegistry)
@ -7,14 +7,21 @@ import net.corda.core.serialization.SerializationFactory
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.utilities.ByteSequence
import net.corda.djvm.rewiring.createRawPredicateFactory
import net.corda.djvm.rewiring.createSandboxPredicate
import net.corda.djvm.rewiring.SandboxClassLoader
import net.corda.serialization.djvm.deserializers.CheckEnum
import net.corda.serialization.djvm.deserializers.DescribeEnum
import net.corda.serialization.djvm.serializers.PrimitiveSerializer
import net.corda.serialization.internal.GlobalTransientClassWhiteList
import net.corda.serialization.internal.SerializationContextImpl
import net.corda.serialization.internal.SerializationFactoryImpl
import net.corda.serialization.internal.amqp.AMQPSerializer
import net.corda.serialization.internal.amqp.amqpMagic
import net.corda.serialization.internal.model.BaseLocalTypes
import java.util.EnumSet
import java.util.function.Function
import java.util.function.Predicate
inline fun SandboxClassLoader.toSandboxAnyClass(clazz: Class<*>): Class<Any> {
@ -42,20 +49,40 @@ fun createSandboxSerializationEnv(
encoding = null
val rawTaskFactory = classLoader.createRawTaskFactory()
val sandboxBasicInput = classLoader.createBasicInput()
val rawTaskFactory = classLoader.createRawTaskFactory()
val taskFactory = rawTaskFactory.compose(classLoader.createSandboxFunction())
val predicateFactory = classLoader.createRawPredicateFactory().compose(classLoader.createSandboxPredicate())
val primitiveSerializerFactory: Function<Class<*>, AMQPSerializer<Any>> = Function { clazz ->
PrimitiveSerializer(clazz, sandboxBasicInput)
val isEnumPredicate = predicateFactory.apply(CheckEnum::class.java) as Predicate<Class<*>>
val enumConstants = taskFactory.apply(DescribeEnum::class.java) as Function<Class<*>, Array<out Any>>
val sandboxLocalTypes = BaseLocalTypes(
collectionClass = classLoader.toSandboxClass(Collection::class.java),
enumSetClass = classLoader.toSandboxClass(EnumSet::class.java),
exceptionClass = classLoader.toSandboxClass(Exception::class.java),
mapClass = classLoader.toSandboxClass(Map::class.java),
stringClass = classLoader.toSandboxClass(String::class.java),
isEnum = isEnumPredicate,
enumConstants = enumConstants
val schemeBuilder = SandboxSerializationSchemeBuilder(
classLoader = classLoader,
sandboxBasicInput = sandboxBasicInput,
rawTaskFactory = rawTaskFactory,
taskFactory = taskFactory,
predicateFactory = predicateFactory,
customSerializerClassNames = customSerializerClassNames,
serializationWhitelistNames = serializationWhitelistNames,
serializerFactoryFactory = SandboxSerializerFactoryFactory(primitiveSerializerFactory)
serializerFactoryFactory = SandboxSerializerFactoryFactory(
primitiveSerializerFactory = primitiveSerializerFactory,
localTypes = sandboxLocalTypes
val factory = SerializationFactoryImpl(mutableMapOf()).apply {
@ -35,9 +35,9 @@ class SandboxCollectionSerializer(
private val unsupportedTypes: Set<Class<Any>> = listOf(
).map {
).mapTo(LinkedHashSet()) {
// The order matters here - the first match should be the most specific one.
// Kotlin preserves the ordering for us by associating into a LinkedHashMap.
@ -95,9 +95,9 @@ private class ConcreteCollectionSerializer(
override val typeDescriptor: Symbol by lazy {
observedType = declaredType.rawType,
observedType = declaredType,
typeIdentifier = TypeIdentifier.forGenericType(declaredType),
elementType =factory.getTypeInformation(declaredType.actualTypeArguments[0])
elementType = factory.getTypeInformation(declaredType.actualTypeArguments[0])
@ -109,7 +109,7 @@ private class ConcreteCollectionSerializer(
context: SerializationContext
): Any {
val inboundType = type.actualTypeArguments[0]
return ifThrowsAppend({ type.typeName }) {
return ifThrowsAppend(type::getTypeName) {
val args = (obj as List<*>).map {
input.readObjectOrNull(redescribe(it, inboundType), schemas, inboundType, context)
@ -2,6 +2,7 @@ package net.corda.serialization.djvm.serializers
import net.corda.core.serialization.SerializationContext
import net.corda.djvm.rewiring.SandboxClassLoader
import net.corda.serialization.djvm.deserializers.CheckEnum
import net.corda.serialization.djvm.deserializers.DescribeEnum
import net.corda.serialization.djvm.toSandboxAnyClass
import net.corda.serialization.internal.amqp.AMQPNotSerializableException
@ -19,23 +20,32 @@ import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.codec.Data
import java.lang.reflect.Type
import java.util.function.Function
import java.util.function.Predicate
class SandboxEnumSerializer(
classLoader: SandboxClassLoader,
taskFactory: Function<Class<out Function<*, *>>, out Function<in Any?, out Any?>>,
predicateFactory: Function<Class<out Predicate<*>>, out Predicate<in Any?>>,
private val localFactory: LocalSerializerFactory
) : CustomSerializer.Implements<Any>(clazz = classLoader.toSandboxAnyClass(Enum::class.java)) {
private val describer: Function<Class<*>, Array<Any>>
private val describeEnum: Function<Class<*>, Array<Any>>
= taskFactory.apply(DescribeEnum::class.java) as Function<Class<*>, Array<Any>>
private val isEnum: Predicate<Class<*>>
= predicateFactory.apply(CheckEnum::class.java) as Predicate<Class<*>>
override val schemaForDocumentation: Schema = Schema(emptyList())
override fun isSerializerFor(clazz: Class<*>): Boolean {
return super.isSerializerFor(clazz) && isEnum.test(clazz)
override fun specialiseFor(declaredType: Type): AMQPSerializer<Any>? {
if (declaredType !is Class<*>) {
return null
val members = describer.apply(declaredType)
val members = describeEnum.apply(declaredType)
return ConcreteEnumSerializer(declaredType, members, localFactory)
@ -68,7 +78,7 @@ private class ConcreteEnumSerializer(
members.map { it.toString() },
@ -85,9 +85,9 @@ private class ConcreteMapSerializer(
override val typeDescriptor: Symbol by lazy {
observedType = declaredType.rawType,
observedType = declaredType,
typeIdentifier = TypeIdentifier.forGenericType(declaredType),
keyType =factory.getTypeInformation(declaredType.actualTypeArguments[0]),
keyType = factory.getTypeInformation(declaredType.actualTypeArguments[0]),
valueType = factory.getTypeInformation(declaredType.actualTypeArguments[1])
@ -101,7 +101,7 @@ private class ConcreteMapSerializer(
): Any {
val inboundKeyType = type.actualTypeArguments[0]
val inboundValueType = type.actualTypeArguments[1]
return ifThrowsAppend({ type.typeName }) {
return ifThrowsAppend(type::getTypeName) {
val entries = (obj as Map<*, *>).map {
input.readObjectOrNull(redescribe(it.key, inboundKeyType), schemas, inboundKeyType, context),
@ -7,6 +7,10 @@ import net.corda.serialization.djvm.SandboxType.KOTLIN
import net.corda.serialization.internal.SerializationFactoryImpl
import net.corda.serialization.internal.amqp.SerializerFactory
import net.corda.serialization.internal.model.LocalTypeInformation
import net.corda.serialization.internal.model.LocalTypeInformation.ACollection
import net.corda.serialization.internal.model.LocalTypeInformation.AnEnum
import net.corda.serialization.internal.model.LocalTypeInformation.AMap
import net.corda.serialization.internal.model.LocalTypeInformation.Abstract
import net.corda.serialization.internal.model.LocalTypeInformation.Atomic
import net.corda.serialization.internal.model.LocalTypeInformation.Opaque
import org.apache.qpid.proton.amqp.Decimal128
@ -19,11 +23,10 @@ import org.apache.qpid.proton.amqp.UnsignedLong
import org.apache.qpid.proton.amqp.UnsignedShort
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Date
import java.util.EnumSet
import java.util.UUID
class LocalTypeModelTest : TestBase(KOTLIN) {
private val serializerFactory: SerializerFactory get() {
val factory = SerializationFactory.defaultFactory as SerializationFactoryImpl
@ -47,7 +50,7 @@ class LocalTypeModelTest : TestBase(KOTLIN) {
fun testString() = sandbox {
@ -158,4 +161,31 @@ class LocalTypeModelTest : TestBase(KOTLIN) {
fun testCollection() = sandbox {
fun testEnum() = sandbox {
fun testEnumSet() = sandbox {
val exampleEnumSet = EnumSet.noneOf(ExampleEnum::class.java)
fun testMap() = sandbox {
@ -71,27 +71,43 @@ abstract class TestBase(type: SandboxType) {
SandboxType.JAVA -> TESTING_LIBRARIES.filter { isDirectory(it) }
fun sandbox(action: SandboxRuntimeContext.() -> Unit) {
return sandbox(WARNING, emptySet(), emptySet(), action)
inline fun sandbox(crossinline action: SandboxRuntimeContext.() -> Unit) {
sandbox(Consumer { ctx -> action(ctx) })
fun sandbox(visibleAnnotations: Set<Class<out Annotation>>, action: SandboxRuntimeContext.() -> Unit) {
return sandbox(WARNING, visibleAnnotations, emptySet(), action)
fun sandbox(action: Consumer<SandboxRuntimeContext>) {
sandbox(WARNING, emptySet(), emptySet(), action)
inline fun sandbox(visibleAnnotations: Set<Class<out Annotation>>, crossinline action: SandboxRuntimeContext.() -> Unit) {
sandbox(visibleAnnotations, Consumer { ctx -> action(ctx) })
fun sandbox(visibleAnnotations: Set<Class<out Annotation>>, action: Consumer<SandboxRuntimeContext>) {
sandbox(WARNING, visibleAnnotations, emptySet(), action)
inline fun sandbox(
visibleAnnotations: Set<Class<out Annotation>>,
sandboxOnlyAnnotations: Set<String>,
crossinline action: SandboxRuntimeContext.() -> Unit
) {
sandbox(visibleAnnotations, sandboxOnlyAnnotations, Consumer { ctx -> action(ctx) })
fun sandbox(
visibleAnnotations: Set<Class<out Annotation>>,
sandboxOnlyAnnotations: Set<String>,
action: SandboxRuntimeContext.() -> Unit
action: Consumer<SandboxRuntimeContext>
) {
return sandbox(WARNING, visibleAnnotations, sandboxOnlyAnnotations, action)
sandbox(WARNING, visibleAnnotations, sandboxOnlyAnnotations, action)
fun sandbox(
minimumSeverityLevel: Severity,
visibleAnnotations: Set<Class<out Annotation>>,
sandboxOnlyAnnotations: Set<String>,
action: SandboxRuntimeContext.() -> Unit
action: Consumer<SandboxRuntimeContext>
) {
var thrownException: Throwable? = null
thread(start = false) {
@ -100,9 +116,7 @@ abstract class TestBase(type: SandboxType) {
})).use(Consumer { ctx ->
}.apply {
uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { _, ex ->
@ -4,6 +4,8 @@ import net.corda.core.CordaThrowable
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.serialization.internal.model.DefaultCacheProvider
import net.corda.serialization.internal.model.TypeIdentifier
import java.lang.reflect.Type
@ -93,10 +95,10 @@ class CachingCustomSerializerRegistry(
* that expects to find getters and a constructor with a parameter for each property.
override fun register(customSerializer: CustomSerializer<out Any>) {
logger.trace("action=\"Registering custom serializer\", class=\"${customSerializer.type}\"")
logger.trace { "action=\"Registering custom serializer\", class=\"${customSerializer.type}\"" }
if (customSerializersCache.isNotEmpty()) {
logger.warn("Attempting to register custom serializer $customSerializer.type} in an active cache." +
logger.warn("Attempting to register custom serializer ${customSerializer.type} in an active cache." +
"All serializers should be registered before the cache comes into use.")
@ -119,7 +121,7 @@ class CachingCustomSerializerRegistry(
override fun registerExternal(customSerializer: CorDappCustomSerializer) {
logger.trace("action=\"Registering external serializer\", class=\"${customSerializer.type}\"")
logger.trace { "action=\"Registering external serializer\", class=\"${customSerializer.type}\"" }
if (customSerializersCache.isNotEmpty()) {
logger.warn("Attempting to register custom serializer ${customSerializer.type} in an active cache." +
@ -153,8 +155,7 @@ class CachingCustomSerializerRegistry(
(declaredSuperClass == null
|| !customSerializer.isSerializerFor(declaredSuperClass)
|| !customSerializer.revealSubclassesInSchema) -> {
logger.debug("action=\"Using custom serializer\", class=${clazz.typeName}, " +
logger.debug { "action=\"Using custom serializer\", class=${clazz.typeName}, declaredType=${declaredType.typeName}" }
customSerializer as? AMQPSerializer<Any>
@ -7,6 +7,7 @@ import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.serialization.internal.*
import net.corda.serialization.internal.model.TypeIdentifier
import org.apache.qpid.proton.amqp.Binary
@ -119,7 +120,7 @@ class DeserializationInput constructor(
des {
val envelope = getEnvelope(bytes, context.encodingWhitelist)
logger.trace("deserialize blob scheme=\"${envelope.schema}\"")
logger.trace { "deserialize blob scheme=\"${envelope.schema}\"" }
doReadObject(envelope, clazz, context)
@ -3,8 +3,10 @@ package net.corda.serialization.internal.amqp
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.internal.MissingSerializerException
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.serialization.internal.model.*
import java.io.NotSerializableException
import java.util.Collections.singletonList
* A factory that knows how to create serializers to deserialize values sent to us by remote parties.
@ -65,7 +67,7 @@ class DefaultRemoteSerializerFactory(
): AMQPSerializer<Any> =
// If we have seen this descriptor before, we assume we have seen everything in this schema before.
descriptorBasedSerializerRegistry.getOrBuild(typeDescriptor) {
logger.trace("get Serializer descriptor=$typeDescriptor")
logger.trace { "get Serializer descriptor=$typeDescriptor" }
// Interpret all of the types in the schema into RemoteTypeInformation, and reflect that into LocalTypeInformation.
val remoteTypeInformationMap = remoteTypeModel.interpret(schema)
@ -75,7 +77,7 @@ class DefaultRemoteSerializerFactory(
// This will save us having to re-interpret the entire schema on re-entry when deserialising individual property values.
val serializers = reflected.mapValues { (descriptor, remoteLocalPair) ->
descriptorBasedSerializerRegistry.getOrBuild(descriptor) {
getUncached(remoteLocalPair.remoteTypeInformation, remoteLocalPair.localTypeInformation)
getUncached(remoteLocalPair.remoteTypeInformation, remoteLocalPair.localTypeInformation, context)
@ -88,7 +90,8 @@ class DefaultRemoteSerializerFactory(
private fun getUncached(
remoteTypeInformation: RemoteTypeInformation,
localTypeInformation: LocalTypeInformation
localTypeInformation: LocalTypeInformation,
context: SerializationContext
): AMQPSerializer<Any> {
val remoteDescriptor = remoteTypeInformation.typeDescriptor
@ -109,6 +112,13 @@ class DefaultRemoteSerializerFactory(
evolutionSerializerFactory.getEvolutionSerializer(remoteTypeInformation, localTypeInformation)
?: localSerializer
// The type descriptors are never going to match when we deserialise into
// the DJVM's sandbox, but we don't want the node logs to fill up with
// Big 'n Scary warnings either. Assume that the local serializer is fine
// provided the local type is the same one we expect when loading the
// remote class.
remoteTypeInformation.isCompatibleWith(localTypeInformation, context) -> localSerializer
// Descriptors don't match, and something is probably broken, but we let the framework do what it can with the local
// serialiser (BlobInspectorTest uniquely breaks if we throw an exception here, and passes if we just warn and continue).
else -> {
@ -134,7 +144,7 @@ ${localTypeInformation.prettyPrint(false)}
return remoteInformation.mapValues { (_, remoteInformation) ->
RemoteAndLocalTypeInformation(remoteInformation, localInformationByIdentifier[remoteInformation.typeIdentifier]!!)
RemoteAndLocalTypeInformation(remoteInformation, localInformationByIdentifier.getValue(remoteInformation.typeIdentifier))
@ -145,7 +155,16 @@ ${localTypeInformation.prettyPrint(false)}
private fun RemoteTypeInformation.isDeserialisableWithoutEvolutionTo(localTypeInformation: LocalTypeInformation) =
this is RemoteTypeInformation.Parameterised &&
this is RemoteTypeInformation.Parameterised &&
(localTypeInformation is LocalTypeInformation.ACollection ||
localTypeInformation is LocalTypeInformation.AMap)
private fun RemoteTypeInformation.isCompatibleWith(
localTypeInformation: LocalTypeInformation,
context: SerializationContext
): Boolean {
val localTypes = typeLoader.load(singletonList(this), context)
return localTypes.size == 1
&& localTypeInformation.observedType == localTypes.values.first()
@ -3,9 +3,6 @@ package net.corda.serialization.internal.amqp
import net.corda.core.KeepForDJVM
import net.corda.core.serialization.CordaSerializationTransformEnumDefault
import net.corda.core.serialization.CordaSerializationTransformRename
import net.corda.serialization.internal.NotSerializableDetailedException
import net.corda.serialization.internal.model.EnumTransforms
import net.corda.serialization.internal.model.InvalidEnumTransformsException
import net.corda.serialization.internal.model.LocalTypeInformation
import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.codec.DescribedTypeConstructor
@ -2,18 +2,27 @@ package net.corda.serialization.internal.amqp
import com.google.common.primitives.Primitives
import net.corda.core.serialization.ClassWhitelist
import net.corda.serialization.internal.model.BaseLocalTypes
import net.corda.serialization.internal.model.LocalTypeModelConfiguration
import org.apache.qpid.proton.amqp.*
import java.lang.reflect.Type
import java.util.*
import java.util.Date
import java.util.EnumSet
import java.util.UUID
import java.util.function.Function
import java.util.function.Predicate
* [LocalTypeModelConfiguration] based on a [ClassWhitelist]
class WhitelistBasedTypeModelConfiguration(
private val whitelist: ClassWhitelist,
private val customSerializerRegistry: CustomSerializerRegistry)
: LocalTypeModelConfiguration {
private val customSerializerRegistry: CustomSerializerRegistry,
override val baseTypes: BaseLocalTypes
) : LocalTypeModelConfiguration {
constructor(whitelist: ClassWhitelist, customSerializerRegistry: CustomSerializerRegistry)
: this(whitelist, customSerializerRegistry, DEFAULT_BASE_TYPES)
override fun isExcluded(type: Type): Boolean = whitelist.isNotWhitelisted(type.asClass())
override fun isOpaque(type: Type): Boolean = Primitives.unwrap(type.asClass()) in opaqueTypes ||
customSerializerRegistry.findCustomSerializer(type.asClass(), type) != null
@ -42,4 +51,14 @@ private val opaqueTypes = setOf(
private val DEFAULT_BASE_TYPES = BaseLocalTypes(
collectionClass = Collection::class.java,
enumSetClass = EnumSet::class.java,
exceptionClass = Exception::class.java,
mapClass = Map::class.java,
stringClass = String::class.java,
isEnum = Predicate { clazz -> clazz.isEnum },
enumConstants = Function { clazz -> clazz.enumConstants }
@ -7,11 +7,24 @@ import net.corda.core.serialization.ConstructorForDeserialization
import net.corda.core.serialization.DeprecatedConstructorForDeserialization
import net.corda.serialization.internal.NotSerializableDetailedException
import net.corda.serialization.internal.amqp.*
import net.corda.serialization.internal.model.LocalTypeInformation.Abstract
import net.corda.serialization.internal.model.LocalTypeInformation.AnArray
import net.corda.serialization.internal.model.LocalTypeInformation.AnEnum
import net.corda.serialization.internal.model.LocalTypeInformation.AnInterface
import net.corda.serialization.internal.model.LocalTypeInformation.Atomic
import net.corda.serialization.internal.model.LocalTypeInformation.ACollection
import net.corda.serialization.internal.model.LocalTypeInformation.AMap
import net.corda.serialization.internal.model.LocalTypeInformation.Composable
import net.corda.serialization.internal.model.LocalTypeInformation.Cycle
import net.corda.serialization.internal.model.LocalTypeInformation.NonComposable
import net.corda.serialization.internal.model.LocalTypeInformation.Opaque
import net.corda.serialization.internal.model.LocalTypeInformation.Singleton
import net.corda.serialization.internal.model.LocalTypeInformation.Top
import net.corda.serialization.internal.model.LocalTypeInformation.Unknown
import java.io.NotSerializableException
import java.lang.reflect.Method
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.*
import kotlin.collections.LinkedHashMap
import kotlin.reflect.KFunction
import kotlin.reflect.full.findAnnotation
@ -35,8 +48,10 @@ import kotlin.reflect.jvm.javaType
internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
var resolutionContext: Type? = null,
var visited: Set<TypeIdentifier> = emptySet(),
val cycles: MutableList<LocalTypeInformation.Cycle> = mutableListOf(),
val cycles: MutableList<Cycle> = mutableListOf(),
var validateProperties: Boolean = true) {
private val baseTypes = lookup.baseTypes
* If we are examining the type of a read-only property, or a type flagged as [Opaque], then we do not need to warn
* if the [LocalTypeInformation] for that type (or any of its related types) is [LocalTypeInformation.NonComposable].
@ -55,7 +70,7 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
* Recursively build [LocalTypeInformation] for the given [Type] and [TypeIdentifier]
fun build(type: Type, typeIdentifier: TypeIdentifier): LocalTypeInformation =
if (typeIdentifier in visited) LocalTypeInformation.Cycle(type, typeIdentifier).apply { cycles.add(this) }
if (typeIdentifier in visited) Cycle(type, typeIdentifier).apply { cycles.add(this) }
else lookup.findOrBuild(type, typeIdentifier) { isOpaque ->
val previous = visited
try {
@ -78,15 +93,16 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
private fun buildIfNotFound(type: Type, typeIdentifier: TypeIdentifier, isOpaque: Boolean): LocalTypeInformation {
val rawType = type.asClass()
return when (typeIdentifier) {
is TypeIdentifier.TopType -> LocalTypeInformation.Top
is TypeIdentifier.UnknownType -> LocalTypeInformation.Unknown
is TypeIdentifier.TopType -> Top
is TypeIdentifier.UnknownType -> Unknown
is TypeIdentifier.Unparameterised,
is TypeIdentifier.Erased -> buildForClass(rawType, typeIdentifier, isOpaque)
is TypeIdentifier.ArrayOf -> {
is TypeIdentifier.Parameterised -> buildForParameterised(rawType, type as ParameterizedType, typeIdentifier, isOpaque)
@ -94,38 +110,41 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
private fun buildForClass(type: Class<*>, typeIdentifier: TypeIdentifier, isOpaque: Boolean): LocalTypeInformation = withContext(type) {
when {
Collection::class.java.isAssignableFrom(type) &&
!EnumSet::class.java.isAssignableFrom(type) -> LocalTypeInformation.ACollection(type, typeIdentifier, LocalTypeInformation.Unknown)
Map::class.java.isAssignableFrom(type) -> LocalTypeInformation.AMap(type, typeIdentifier, LocalTypeInformation.Unknown, LocalTypeInformation.Unknown)
type == String::class.java -> LocalTypeInformation.Atomic(String::class.java, typeIdentifier)
type.kotlin.javaPrimitiveType != null ->LocalTypeInformation.Atomic(type, typeIdentifier)
type.isEnum -> LocalTypeInformation.AnEnum(
baseTypes.collectionClass.isAssignableFrom(type) &&
!baseTypes.enumSetClass.isAssignableFrom(type) -> ACollection(type, typeIdentifier, Unknown)
baseTypes.mapClass.isAssignableFrom(type) -> AMap(type, typeIdentifier, Unknown, Unknown)
type === baseTypes.stringClass -> Atomic(type, typeIdentifier)
type.kotlin.javaPrimitiveType != null -> Atomic(type, typeIdentifier)
baseTypes.isEnum.test(type) -> baseTypes.enumConstants.apply(type).let { enumConstants ->
type.enumConstants.map { it.toString() },
type.kotlinObjectInstance != null -> LocalTypeInformation.Singleton(
getEnumTransforms(type, enumConstants)
type.kotlinObjectInstance != null -> Singleton(
type.isInterface -> buildInterface(type, typeIdentifier, emptyList())
type.isAbstractClass -> buildAbstract(type, typeIdentifier, emptyList())
isOpaque -> LocalTypeInformation.Opaque(
isOpaque -> Opaque(
suppressValidation { buildNonAtomic(type, type, typeIdentifier, emptyList()) })
Exception::class.java.isAssignableFrom(type.asClass()) -> suppressValidation {
baseTypes.exceptionClass.isAssignableFrom(type.asClass()) -> suppressValidation {
buildNonAtomic(type, type, typeIdentifier, emptyList())
else -> buildNonAtomic(type, type, typeIdentifier, emptyList())
private fun getEnumTransforms(type: Class<*>): EnumTransforms {
private fun getEnumTransforms(type: Class<*>, enumConstants: Array<out Any>): EnumTransforms {
try {
val constants = type.enumConstants.asSequence().mapIndexed { index, constant -> constant.toString() to index }.toMap()
val constants = enumConstants.asSequence().mapIndexed { index, constant -> constant.toString() to index }.toMap()
return EnumTransforms.build(TransformsAnnotationProcessor.getTransformsSchema(type), constants)
} catch (e: InvalidEnumTransformsException) {
throw NotSerializableDetailedException(type.name, e.message!!)
@ -138,16 +157,16 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
typeIdentifier: TypeIdentifier.Parameterised,
isOpaque: Boolean): LocalTypeInformation = withContext(type) {
when {
Collection::class.java.isAssignableFrom(rawType) &&
!EnumSet::class.java.isAssignableFrom(rawType) ->
LocalTypeInformation.ACollection(type, typeIdentifier, buildTypeParameterInformation(type)[0])
Map::class.java.isAssignableFrom(rawType) -> {
baseTypes.collectionClass.isAssignableFrom(rawType) &&
!baseTypes.enumSetClass.isAssignableFrom(rawType) ->
ACollection(type, typeIdentifier, buildTypeParameterInformation(type)[0])
baseTypes.mapClass.isAssignableFrom(rawType) -> {
val (keyType, valueType) = buildTypeParameterInformation(type)
LocalTypeInformation.AMap(type, typeIdentifier, keyType, valueType)
AMap(type, typeIdentifier, keyType, valueType)
rawType.isInterface -> buildInterface(type, typeIdentifier, buildTypeParameterInformation(type))
rawType.isAbstractClass -> buildAbstract(type, typeIdentifier, buildTypeParameterInformation(type))
isOpaque -> LocalTypeInformation.Opaque(rawType,
isOpaque -> Opaque(rawType,
suppressValidation { buildNonAtomic(rawType, type, typeIdentifier, buildTypeParameterInformation(type)) })
else -> buildNonAtomic(rawType, type, typeIdentifier, buildTypeParameterInformation(type))
@ -155,23 +174,25 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
private fun buildAbstract(type: Type, typeIdentifier: TypeIdentifier,
typeParameters: List<LocalTypeInformation>): LocalTypeInformation.Abstract =
typeParameters: List<LocalTypeInformation>): Abstract =
private fun buildInterface(type: Type, typeIdentifier: TypeIdentifier,
typeParameters: List<LocalTypeInformation>): LocalTypeInformation.AnInterface =
typeParameters: List<LocalTypeInformation>): AnInterface =
private inline fun <T> withContext(newContext: Type, block: LocalTypeInformationBuilder.() -> T): T {
val previous = resolutionContext
@ -196,11 +217,11 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
private fun buildNonAtomic(rawType: Class<*>, type: Type, typeIdentifier: TypeIdentifier, typeParameterInformation: List<LocalTypeInformation>): LocalTypeInformation {
val superclassInformation = buildSuperclassInformation(type)
val interfaceInformation = buildInterfaceInformation(type)
val observedConstructor = constructorForDeserialization(type) ?: return LocalTypeInformation.NonComposable(
val observedConstructor = constructorForDeserialization(type) ?: return NonComposable(
observedType = type,
typeIdentifier = typeIdentifier,
constructor = null,
properties = if (rawType == Class::class.java) {
properties = if (rawType === Class::class.java) {
// Do NOT drill down into the internals of java.lang.Class.
} else {
@ -220,7 +241,7 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
if (!propertiesSatisfyConstructor(constructorInformation, properties)) {
val missingConstructorProperties = missingMandatoryConstructorProperties(constructorInformation, properties)
val missingParameters = missingConstructorProperties.map(LocalConstructorParameterInformation::name)
return LocalTypeInformation.NonComposable(
return NonComposable(
observedType = type,
typeIdentifier = typeIdentifier,
constructor = constructorInformation,
@ -229,16 +250,16 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
interfaces = interfaceInformation,
typeParameters = typeParameterInformation,
nonComposableSubtypes = missingConstructorProperties
.filterIsInstanceTo(LinkedHashSet(), LocalTypeInformation.NonComposable::class.java),
.filterIsInstanceTo(LinkedHashSet(), NonComposable::class.java),
reason = "Mandatory constructor parameters $missingParameters are missing from the readable properties ${properties.keys}",
remedy = "Either provide getters or readable fields for $missingParameters, or provide a custom serializer for this type"
val nonComposableProperties = properties.filterValues { it.type is LocalTypeInformation.NonComposable }
val nonComposableProperties = properties.filterValues { it.type is NonComposable }
if (nonComposableProperties.isNotEmpty()) {
return LocalTypeInformation.NonComposable(
return NonComposable(
observedType = type,
typeIdentifier = typeIdentifier,
constructor = constructorInformation,
@ -247,7 +268,7 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
interfaces = interfaceInformation,
typeParameters = typeParameterInformation,
nonComposableSubtypes = nonComposableProperties.values.mapTo(LinkedHashSet()) {
it.type as LocalTypeInformation.NonComposable
it.type as NonComposable
reason = nonComposablePropertiesErrorReason(nonComposableProperties),
remedy = "Either ensure that the properties ${nonComposableProperties.keys} are serializable, or provide a custom serializer for this type"
@ -260,7 +281,7 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
EvolutionConstructorInformation(evolutionConstructorInformation, evolutionProperties)
return LocalTypeInformation.Composable(type, typeIdentifier, constructorInformation, evolutionConstructors, properties,
return Composable(type, typeIdentifier, constructorInformation, evolutionConstructors, properties,
superclassInformation, interfaceInformation, typeParameterInformation)
@ -268,13 +289,13 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
private fun propertiesSatisfyConstructor(constructorInformation: LocalConstructorInformation, properties: Map<PropertyName, LocalPropertyInformation>): Boolean {
if (!constructorInformation.hasParameters) return true
val indicesAddressedByProperties = properties.values.asSequence().mapNotNull {
val indicesAddressedByProperties = properties.values.asSequence().mapNotNullTo(LinkedHashSet()) {
when (it) {
is LocalPropertyInformation.ConstructorPairedProperty -> it.constructorSlot.parameterIndex
is LocalPropertyInformation.PrivateConstructorPairedProperty -> it.constructorSlot.parameterIndex
else -> null
return (constructorInformation.parameters.indices).none { index ->
constructorInformation.parameters[index].isMandatory && index !in indicesAddressedByProperties
@ -287,13 +308,13 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
): List<LocalConstructorParameterInformation> {
if (!constructorInformation.hasParameters) return emptyList()
val indicesAddressedByProperties = properties.values.asSequence().mapNotNull {
val indicesAddressedByProperties = properties.values.asSequence().mapNotNullTo(LinkedHashSet()) {
when (it) {
is LocalPropertyInformation.ConstructorPairedProperty -> it.constructorSlot.parameterIndex
is LocalPropertyInformation.PrivateConstructorPairedProperty -> it.constructorSlot.parameterIndex
else -> null
return (constructorInformation.parameters.indices).mapNotNull { index ->
val parameter = constructorInformation.parameters[index]
@ -306,7 +327,7 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
private fun nonComposablePropertiesErrorReason(nonComposableProperties: Map<PropertyName, LocalPropertyInformation>): String {
val reasons = nonComposableProperties.entries.joinToString("\n") { (key, value) ->
"$key [${value.type.observedType}]: ${(value.type as LocalTypeInformation.NonComposable).reason}"
"$key [${value.type.observedType}]: ${(value.type as NonComposable).reason}"
.replace("\n", "\n ")
return "Has properties ${nonComposableProperties.keys} of types that are not serializable:\n" + reasons
@ -374,7 +395,7 @@ internal data class LocalTypeInformationBuilder(val lookup: LocalTypeLookup,
val property = makeConstructorPairedProperty(
if (property == null) null else normalisedName to property
@ -1,6 +1,8 @@
package net.corda.serialization.internal.model
import java.lang.reflect.*
import java.util.function.Function
import java.util.function.Predicate
* Provides a means for looking up [LocalTypeInformation] by [Type] and [TypeIdentifier], falling back to building it
@ -24,6 +26,12 @@ interface LocalTypeLookup {
* because it is not whitelisted.
fun isExcluded(type: Type): Boolean
* These classes are used by [LocalTypeInformationBuilder] to
* build the correct [LocalTypeInformation] subclasses.
val baseTypes: BaseLocalTypes
@ -72,6 +80,8 @@ class ConfigurableLocalTypeModel(private val typeModelConfiguration: LocalTypeMo
override fun isExcluded(type: Type): Boolean = typeModelConfiguration.isExcluded(type)
override val baseTypes = typeModelConfiguration.baseTypes
* Merge the local cache back into the global cache, once we've finished traversal (and patched all cycles).
@ -111,4 +121,20 @@ interface LocalTypeModelConfiguration {
* [LocalTypeInformation], usually because they are not included in a whitelist.
fun isExcluded(type: Type): Boolean
* These classes are used by [LocalTypeInformationBuilder] to
* build the correct [LocalTypeInformation] subclasses.
val baseTypes: BaseLocalTypes
class BaseLocalTypes(
val collectionClass: Class<*>,
val enumSetClass: Class<*>,
val exceptionClass: Class<*>,
val mapClass: Class<*>,
val stringClass: Class<*>,
val isEnum: Predicate<Class<*>>,
val enumConstants: Function<Class<*>, Array<out Any>>
@ -2,8 +2,6 @@ package net.corda.serialization.internal.model
import net.corda.core.serialization.SerializationContext
import net.corda.serialization.internal.carpenter.*
import java.io.NotSerializableException
import java.lang.ClassCastException
import java.lang.reflect.Type
@ -26,6 +26,8 @@ object CreateStateFlow {
@ -56,6 +58,9 @@ object CreateStateFlow {
class Initiator(private val randomValue: String, private val errorTarget: Int) : FlowLogic<UniqueIdentifier>() {
companion object {
var onExitingCall: () -> Unit = {}
override fun call(): UniqueIdentifier {
@ -93,6 +98,7 @@ object CreateStateFlow {
logger.info("Test flow: returning")
return state.linearId
@ -1,8 +1,12 @@
package com.r3.dbfailure.workflows
import com.r3.dbfailure.contracts.DbFailureContract
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.Vault
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import java.security.InvalidParameterException
@ -12,10 +16,15 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
companion object {
val log = contextLogger()
var onError: ((Throwable) -> Unit)? = null
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
var throwUnrecoverableError = false
init {
services.vaultService.rawUpdates.subscribe { (_, produced) ->
val onNext: (Vault.Update<ContractState>) -> Unit =
{ (_, produced) ->
produced.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
@ -26,9 +35,9 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
val session = services.jdbcSession()
val statement = session.createStatement()
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
log.info("SQL result: ${statement.resultSet}")
@ -37,9 +46,9 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
val session = services.jdbcSession()
val statement = session.createStatement()
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
log.info("SQL result: ${statement.resultSet}")
@ -48,9 +57,9 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
val session = services.jdbcSession()
val statement = session.createStatement()
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
log.info("SQL result: ${statement.resultSet}")
@ -59,8 +68,8 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
val session = services.jdbcSession()
val statement = session.createStatement()
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
log.info("SQL result: ${statement.resultSet}")
@ -69,8 +78,8 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
val session = services.jdbcSession()
val statement = session.createStatement()
val rs = statement.executeQuery(
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
@ -80,13 +89,25 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
log.info("Throw InvalidParameterException")
throw InvalidParameterException("Toys out of pram")
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
log.info("Throw Exception")
throw Exception("Mother of all exceptions")
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
// this bit of code should only work in a OutOfProcess node,
// otherwise it will kill the testing jvm (including the testing thread)
if (throwUnrecoverableError) {
log.info("Throw Unrecoverable error")
throw OutOfMemoryError("Unrecoverable error")
else -> {
// do nothing, everything else must be handled elsewhere
} catch (t: Throwable) {
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
log.warn("Service not letting errors escape", t)
} else {
throw t
@ -94,5 +115,19 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
if (onError != null) {
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
} else {
class MakeServiceThrowErrorFlow: FlowLogic<Unit>() {
override fun call() {
throwUnrecoverableError = true
@ -0,0 +1,54 @@
package com.r3.transactionfailure.workflows
import co.paralleluniverse.fibers.Suspendable
import com.r3.dbfailure.contracts.DbFailureContract
import com.r3.dbfailure.workflows.CreateStateFlow
import net.corda.core.contracts.Command
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.seconds
object ErrorHandling {
var hookBeforeFirstCheckpoint: () -> Unit = {}
var hookAfterFirstCheckpoint: () -> Unit = {}
var hookAfterSecondCheckpoint: () -> Unit = {}
class CheckpointAfterErrorFlow(private val errorTarget: Int) : FlowLogic<Unit>() {
// We cannot allow this:
// recordTransactions -> throws HospitalizeException
// flow suppress the HospitalizeException
// flow checkpoints
override fun call() {
val notary = serviceHub.networkMapCache.notaryIdentities[0]
hookBeforeFirstCheckpoint.invoke() // should be executed once
sleep(1.seconds) // checkpoint - flow should retry from this one
hookAfterFirstCheckpoint.invoke() // should be executed twice
val txTarget = CreateStateFlow.getTxTarget(errorTarget)
val state = DbFailureContract.TestState(
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else "valid hibernate value",
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
val txBuilder = TransactionBuilder(notary).addOutputState(state).addCommand(txCommand)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
try {
} catch(t: Throwable) {
if (CreateStateFlow.getFlowTarget(errorTarget) == CreateStateFlow.ErrorTarget.FlowSwallowErrors) {
logger.info("Test flow: Swallowing all exception! Muahahaha!", t)
} else {
logger.info("Test flow: caught exception - rethrowing")
throw t
sleep(1.seconds) // checkpoint - this checkpoint should fail
hookAfterSecondCheckpoint.invoke() // should be never executed
@ -164,7 +164,7 @@ class DriverDSLImpl(
private val bytemanJarPath: String? by lazy {
try {
resolveJar("org.jboss.byteman.agent.Transformer", verbose = false)
} catch (e: Exception) {
@ -180,13 +180,16 @@ class DriverDSLImpl(
private fun resolveJar(className: String): String {
private fun resolveJar(className: String, verbose: Boolean = true): String {
return try {
val type = Class.forName(className)
val src = type.protectionDomain.codeSource
} catch (e: Exception) {
log.warn("Unable to locate JAR for class given by `$className` on classpath: ${e.message}", e)
when (verbose) {
true -> log.warn("Unable to locate JAR for class given by `$className` on classpath:", e)
false -> log.info("Unable to locate JAR for class given by `$className` on classpath")
throw e
@ -55,18 +55,6 @@ class ShellCmdLineOptions {
var password: String? = null
names = ["--sshd-port"],
description = ["Enables SSH server for shell."]
var sshdPort: String? = null
names = ["--sshd-hostkey-directory"],
description = ["The directory with hostkey.pem file for SSH server."]
var sshdHostKeyDirectory: Path? = null
names = ["--truststore-password"],
description = ["The password to unlock the TrustStore file."]
@ -100,11 +88,6 @@ class ShellCmdLineOptions {
trustStoreFile?.apply { cmdOpts["ssl.truststore.path"] = this.toString() }
trustStorePassword?.apply { cmdOpts["ssl.truststore.password"] = this }
trustStoreType?.apply { cmdOpts["ssl.truststore.type"] = this }
sshdPort?.apply {
cmdOpts["extensions.sshd.port"] = this
cmdOpts["extensions.sshd.enabled"] = true
sshdHostKeyDirectory?.apply { cmdOpts["extensions.sshd.hostkeypath"] = this.toString() }
return ConfigFactory.parseMap(cmdOpts)
@ -140,19 +123,12 @@ private class ShellConfigurationFile {
val path: String
data class Sshd(
val enabled: Boolean,
val port: Int,
val hostkeypath: String?
data class Commands(
val path: String
data class Extensions(
val cordapps: Cordapps?,
val sshd: Sshd?,
val commands: Commands?
@ -187,9 +163,7 @@ private class ShellConfigurationFile {
user = node.user ?: "",
password = node.password ?: "",
hostAndPort = NetworkHostAndPort(node.addresses.rpc.host, node.addresses.rpc.port),
ssl = sslOptions,
sshdPort = extensions?.sshd?.let { if (it.enabled) it.port else null },
sshHostKeyDirectory = extensions?.sshd?.let { if (it.enabled && it.hostkeypath != null) Paths.get(it.hostkeypath) else null })
ssl = sslOptions)
@ -105,7 +105,6 @@ class StandaloneShell : CordaCliWrapper("corda-shell", "The Corda standalone she
InteractiveShell.runLocalShell {
configuration.sshdPort?.apply{ println("SSH server listening on port $this.") }
// because we can't clean certain Crash Shell threads that block on read()
@ -21,7 +21,6 @@ class StandaloneShellArgsParserTest {
assertEquals(expectedOptions.port, null)
assertEquals(expectedOptions.user, null)
assertEquals(expectedOptions.password, null)
assertEquals(expectedOptions.sshdPort, null)
@ -34,8 +33,6 @@ class StandaloneShellArgsParserTest {
options.port = "1234"
options.user = "demo"
options.password = "abcd1234"
options.sshdPort = "2223"
options.sshdHostKeyDirectory = Paths.get("/x/y/ssh")
options.trustStorePassword = "pass2"
options.trustStoreFile = Paths.get("/x/y/truststore.jks")
options.trustStoreType = "dummy"
@ -50,8 +47,8 @@ class StandaloneShellArgsParserTest {
password = "abcd1234",
hostAndPort = NetworkHostAndPort("alocalhost", 1234),
ssl = expectedSsl,
sshdPort = 2223,
sshHostKeyDirectory = Paths.get("/x/y/ssh"),
sshdPort = null,
sshHostKeyDirectory = null,
noLocalShell = false)
val config = options.toConfig()
@ -69,8 +66,6 @@ class StandaloneShellArgsParserTest {
options.port = null
options.user = null
options.password = null
options.sshdPort = null
options.sshdHostKeyDirectory = null
options.trustStorePassword = null
options.trustStoreFile = null
options.trustStoreType = null
@ -84,7 +79,7 @@ class StandaloneShellArgsParserTest {
ssl = ClientRpcSslOptions(
trustStorePath = Paths.get("/x/y/truststore.jks"),
trustStorePassword = "pass2"),
sshdPort = 2223)
sshdPort = null)
val config = options.toConfig()
@ -100,8 +95,6 @@ class StandaloneShellArgsParserTest {
options.port = null
options.user = null
options.password = "blabla"
options.sshdPort = null
options.sshdHostKeyDirectory = null
options.trustStorePassword = null
options.trustStoreFile = null
options.trustStoreType = null
@ -116,7 +109,7 @@ class StandaloneShellArgsParserTest {
password = "blabla",
hostAndPort = NetworkHostAndPort("alocalhost", 1234),
ssl = expectedSsl,
sshdPort = 2223)
sshdPort = null)
val config = options.toConfig()
@ -12,10 +12,6 @@ extensions {
cordapps {
path : "/x/y/cordapps"
sshd {
enabled : "true"
port : 2223
commands {
path : /x/y/commands
@ -46,16 +46,6 @@
required: false
multiParam: false
acceptableValues: []
- parameterName: "--sshd-hostkey-directory"
parameterType: "java.nio.file.Path"
required: false
multiParam: true
acceptableValues: []
- parameterName: "--sshd-port"
parameterType: "java.lang.String"
required: false
multiParam: false
acceptableValues: []
- parameterName: "--truststore-file"
parameterType: "java.nio.file.Path"
required: false
@ -626,7 +626,7 @@ object InteractiveShell {
} catch (e: StringToMethodCallParser.UnparseableCallException) {
out.println(e.message, Decoration.bold, Color.red)
if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) {
out.println("Please try 'man run' to learn what syntax is acceptable")
out.println("Please try 'run -h' to learn what syntax is acceptable")
} catch (e: Exception) {
out.println("RPC failed: ${e.rootCause}", Decoration.bold, Color.red)
Reference in New Issue
Block a user