@ -0,0 +1,8 @@
kind: StorageClass
name: testing-storage
storageAccount: testrestart
location: westeurope

.ci/dev/smoke/Jenkinsfile vendored Normal file
View File

@ -0,0 +1,72 @@
import static
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
triggers {
issueCommentTrigger('.*smoke tests.*')
agent { label 'k8s' }
options { timestamps() }
environment {
DOCKER_TAG_TO_USE = "${env.GIT_COMMIT.subSequence(0, 8)}st"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
stages {
stage('Smoke Tests') {
steps {
script {
pullRequest.createStatus(status: 'pending',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Building',
targetUrl: "${env.JOB_URL}")
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-DpreAllocatePods=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"\"/tmp/\${EXECUTOR_NUMBER}\" " +
"\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelSmokeTest"
post {
always {
junit testResults: '**/build/test-results-xml/**/*.xml', allowEmptyResults: false
success {
script {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Passed',
targetUrl: "${env.JOB_URL}testResults")
failure {
script {
pullRequest.createStatus(status: 'failure',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Failed',
targetUrl: "${env.JOB_URL}testResults")
cleanup {
deleteDir() /* clean up our workspace */

View File

@ -3,14 +3,14 @@
<option name="LINE_SEPARATOR" value="&#10;" />
<option name="RIGHT_MARGIN" value="140" />
<option name="SOFT_MARGINS" value="140" />
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />

View File

@ -1,5 +1,6 @@
<component name="ProjectCodeStyleConfiguration">
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />

Jenkinsfile vendored
View File

@ -22,8 +22,8 @@ pipeline {
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
"\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace"
sh "kubectl auth can-i get pods"
@ -36,10 +36,20 @@ pipeline {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelIntegrationTest"
"\"\${DOCKER_TAG_TO_USE}\"" +
" deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace"
// stage('Unit Tests') {
// steps {
// sh "./gradlew " +
// "-DbuildId=\"\${BUILD_ID}\" " +
// "-Dkubenetize=true " +
// "\"\${DOCKER_TAG_TO_USE}\"" +
// " deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace"
// }
// }
@ -47,6 +57,7 @@ pipeline {
post {
always {
archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false
junit '**/build/test-results-xml/**/*.xml'
cleanup {

View File

@ -1,7 +1,8 @@
import net.corda.testing.DistributedTesting
import net.corda.testing.DistributeTestsBy
import net.corda.testing.ImageBuilding
import net.corda.testing.Distribution
import net.corda.testing.ParallelTestGroup
import net.corda.testing.PodLogLevel
import static org.gradle.api.JavaVersion.VERSION_11
import static org.gradle.api.JavaVersion.VERSION_1_8
@ -28,8 +29,7 @@ buildscript {
ext.quasar_version = constants.getProperty("quasarVersion11")
ext.quasar_classifier = constants.getProperty("quasarClassifier11")
ext.jdkClassifier = constants.getProperty("jdkClassifier11")
else {
} else {
ext.quasar_version = constants.getProperty("quasarVersion")
ext.quasar_classifier = constants.getProperty("quasarClassifier")
ext.jdkClassifier = constants.getProperty("jdkClassifier")
@ -333,7 +333,6 @@ allprojects {
maven { url "$artifactory_contextUrl/corda-dependencies" }
maven { url '' }
maven { url '' }
@ -597,36 +596,22 @@ buildScan {
task allParallelIntegrationTest(type: ParallelTestGroup) {
podLogLevel PodLogLevel.INFO
testGroups "integrationTest"
numberOfShards 10
streamOutput false
coresPerFork 6
coresPerFork 5
memoryInGbPerFork 10
distribute Distribution.CLASS
task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest"
numberOfShards 4
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "smokeTest"
numberOfShards 4
streamOutput true
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.METHOD
distribute DistributeTestsBy.CLASS
task allParallelUnitTest(type: ParallelTestGroup) {
podLogLevel PodLogLevel.INFO
testGroups "test"
numberOfShards 10
streamOutput false
coresPerFork 5
memoryInGbPerFork 6
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest"
@ -634,7 +619,7 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
@ -642,7 +627,15 @@ task parallelRegressionTest(type: ParallelTestGroup) {
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest", "smokeTest"
numberOfShards 4
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute DistributeTestsBy.CLASS
apply plugin: ImageBuilding
apply plugin: DistributedTesting

View File

@ -1,8 +1,11 @@
package net.corda.testing
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage
import org.gradle.api.GradleException
import org.gradle.api.Plugin
import org.gradle.api.Project
import org.gradle.api.Task
import org.gradle.api.tasks.testing.Test
@ -18,19 +21,22 @@ class DistributedTesting implements Plugin<Project> {
void apply(Project project) {
if (System.getProperty("kubenetize") != null) {
def forks = getPropertyAsInt(project, "dockerForks", 1)
Integer forks = getPropertyAsInt(project, "dockerForks", 1)
ImageBuilding imagePlugin = project.plugins.getPlugin(ImageBuilding)
DockerPushImage imageBuildingTask = imagePlugin.pushTask
String providedTag = System.getProperty("docker.tag")
DockerPushImage imagePushTask = imagePlugin.pushTask
DockerBuildImage imageBuildTask = imagePlugin.buildTask
String tagToUseForRunningTests = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
String tagToUseForBuilding = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
BucketingAllocatorTask globalAllocator = project.tasks.create("bucketingAllocator", BucketingAllocatorTask, forks)
def requestedTasks = project.gradle.startParameter.taskNames.collect { project.tasks.findByPath(it) }
Set<String> requestedTaskNames = project.gradle.startParameter.taskNames.toSet()
def requestedTasks = requestedTaskNames.collect { project.tasks.findByPath(it) }
//in each subproject
//1. add the task to determine all tests within the module
//2. modify the underlying testing task to use the output of the listing task to include a subset of tests for each fork
//1. add the task to determine all tests within the module and register this as a source to the global allocator
//2. modify the underlying testing task to use the output of the global allocator to include a subset of tests for each fork
//3. KubesTest will invoke these test tasks in a parallel fashion on a remote k8s cluster
//4. after each completed test write its name to a file to keep track of what finished for restart purposes
project.subprojects { Project subProject ->
@ -45,32 +51,49 @@ class DistributedTesting implements Plugin<Project> {
println "Skipping modification of ${task.getPath()} as it's not scheduled for execution"
if (!task.hasProperty("ignoreForDistribution")) {
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask, providedTag)
//this is what enables execution of a single test suite - for example node:parallelTest would execute all unit tests in node, node:parallelIntegrationTest would do the same for integration tests
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imagePushTask, tagToUseForRunningTests)
//now we are going to create "super" groupings of these KubesTest tasks, so that it is possible to invoke all submodule tests with a single command
//group all kubes tests by their underlying target task (test/integrationTest/smokeTest ... etc)
Map<String, List<KubesTest>> allKubesTestingTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() }
//now we are going to create "super" groupings of the Test tasks, so that it is possible to invoke all submodule tests with a single command
//group all test Tasks by their underlying target task (test/integrationTest/smokeTest ... etc)
Map<String, List<Test>> allTestTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() }
.findAll { task -> task instanceof KubesTest }
.groupBy { task -> task.taskToExecuteName }
.findAll { task -> task instanceof Test }
.groupBy { Test task -> }
//first step is to create a single task which will invoke all the submodule tasks for each grouping
//ie allParallelTest will invoke [node:test, core:test, client:rpc:test ... etc]
//ie allIntegrationTest will invoke [node:integrationTest, core:integrationTest, client:rpc:integrationTest ... etc]
//ie allUnitAndIntegrationTest will invoke [node:integrationTest, node:test, core:integrationTest, core:test, client:rpc:test , client:rpc:integrationTest ... etc]
Set<ParallelTestGroup> userGroups = new HashSet<>(project.tasks.withType(ParallelTestGroup))
Collection<ParallelTestGroup> userDefinedGroups = userGroups.forEach { testGrouping ->
List<KubesTest> groups = ((ParallelTestGroup) testGrouping).groups.collect {
String superListOfTasks = groups.collect { it.fullTaskToExecutePath }.join(" ")
userGroups.forEach { testGrouping ->
//for each "group" (ie: test, integrationTest) within the grouping find all the Test tasks which have the same name.
List<Test> groups = ((ParallelTestGroup) testGrouping).groups.collect { allTestTasksGroupedByType.get(it) }.flatten()
//join up these test tasks into a single set of tasks to invoke (node:test, node:integrationTest...)
String superListOfTasks = groups.collect { it.path }.join(" ")
//generate a preAllocate / deAllocate task which allows you to "pre-book" a node during the image building phase
//this prevents time lost to cloud provider node spin up time (assuming image build time > provider spin up time)
def (Task preAllocateTask, Task deAllocateTask) = generatePreAllocateAndDeAllocateTasksForGrouping(project, testGrouping)
//modify the image building task to depend on the preAllocate task (if specified on the command line) - this prevents gradle running out of order
if ( in requestedTaskNames) {
imageBuildTask.dependsOn preAllocateTask
def userDefinedParallelTask = project.rootProject.tasks.create("userDefined" +, KubesTest) {
if (!providedTag) {
dependsOn imageBuildingTask
if (!tagToUseForRunningTests) {
dependsOn imagePushTask
if ( in requestedTaskNames) {
dependsOn deAllocateTask
numberOfPods = testGrouping.getShardCount()
printOutput = testGrouping.printToStdOut
@ -79,8 +102,9 @@ class DistributedTesting implements Plugin<Project> {
memoryGbPerFork = testGrouping.gbOfMemory
numberOfCoresPerFork = testGrouping.coresToUse
distribution = testGrouping.distribution
podLogLevel = testGrouping.logLevel
doFirst {
dockerTag = dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get())
dockerTag = tagToUseForRunningTests ? (ImageBuilding.registryName + ":" + tagToUseForRunningTests) : (imagePushTask.imageName.get() + ":" + imagePushTask.tag.get())
def reportOnAllTask = project.rootProject.tasks.create("userDefinedReports${}", KubesReporting) {
@ -99,6 +123,38 @@ class DistributedTesting implements Plugin<Project> {
private List<Task> generatePreAllocateAndDeAllocateTasksForGrouping(Project project, ParallelTestGroup testGrouping) {
PodAllocator allocator = new PodAllocator(project.getLogger())
Task preAllocateTask = project.rootProject.tasks.create("preAllocateFor" + {
doFirst {
String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY)
if (dockerTag == null) {
throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY)
int seed = (dockerTag.hashCode() +
String podPrefix = new BigInteger(64, new Random(seed)).toString(36)
//here we will pre-request the correct number of pods for this testGroup
int numberOfPodsToRequest = testGrouping.getShardCount()
int coresPerPod = testGrouping.getCoresToUse()
int memoryGBPerPod = testGrouping.getGbOfMemory()
allocator.allocatePods(numberOfPodsToRequest, coresPerPod, memoryGBPerPod, podPrefix)
Task deAllocateTask = project.rootProject.tasks.create("deAllocateFor" + {
doFirst {
String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
if (dockerTag == null) {
throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
int seed = (dockerTag.hashCode() +
String podPrefix = new BigInteger(64, new Random(seed)).toString(36);
return [preAllocateTask, deAllocateTask]
private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask, String providedTag) {
def taskName = task.getName()
def capitalizedTaskName = task.getName().capitalize()
@ -120,55 +176,47 @@ class DistributedTesting implements Plugin<Project> {
private Test modifyTestTaskForParallelExecution(Project subProject, Test task, BucketingAllocatorTask globalAllocator) {"modifying task: ${task.getPath()} to depend on task ${globalAllocator.getPath()}")
def reportsDir = new File(new File(subProject.rootProject.getBuildDir(), "test-reports"), + "-" +
def reportsDir = new File(new File(KubesTest.TEST_RUN_DIR, "test-reports"), + "-" +
File executedTestsFile = new File(KubesTest.TEST_RUN_DIR + "/executedTests.txt")
task.configure {
dependsOn globalAllocator
binResultsDir new File(reportsDir, "binary")
reports.junitXml.destination new File(reportsDir, "xml")
maxHeapSize = "6g"
maxHeapSize = "10g"
doFirst {
filter {
List<String> executedTests = []
File executedTestsFile = new File(KubesTest.TEST_RUN_DIR + "/executedTests.txt")
try {
executedTests = executedTestsFile.readLines()
} catch (FileNotFoundException e) {
task.afterTest { desc, result ->
executedTestsFile.withWriterAppend { writer ->
writer.writeLine(desc.getClassName() + "." + desc.getName())
List<String> executedTests = executedTestsFile.readLines()
def fork = getPropertyAsInt(subProject, "dockerFork", 0)"requesting tests to include in testing task ${task.getPath()} (idx: ${fork})")
List<String> includes = globalAllocator.getTestIncludesForForkAndTestTask(
task) "got ${includes.size()} tests to include into testing task ${task.getPath()}"
if (includes.size() == 0) { "Disabling test execution for testing task ${task.getPath()}"
excludeTestsMatching "*"
executedTests.forEach { exclude -> "excluding: $exclude for testing task ${task.getPath()}"
excludeTestsMatching exclude
includes.forEach { include -> "including: $include for testing task ${task.getPath()}"
includeTestsMatching include
failOnNoMatchingTests false
afterTest { desc, result ->
executedTestsFile.withWriterAppend { writer ->
writer.writeLine(desc.getClassName() + "." + desc.getName())
return task

View File

@ -1,8 +1,17 @@
package net.corda.testing;
import com.bmuschko.gradle.docker.DockerRegistryCredentials;
import com.bmuschko.gradle.docker.tasks.container.*;
import com.bmuschko.gradle.docker.tasks.image.*;
import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerLogsContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerWaitContainer;
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage;
import com.bmuschko.gradle.docker.tasks.image.DockerCommitImage;
import com.bmuschko.gradle.docker.tasks.image.DockerPullImage;
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage;
import com.bmuschko.gradle.docker.tasks.image.DockerRemoveImage;
import com.bmuschko.gradle.docker.tasks.image.DockerTagImage;
import org.gradle.api.GradleException;
import org.gradle.api.Plugin;
import org.gradle.api.Project;
@ -12,6 +21,7 @@ import;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
* this plugin is responsible for setting up all the required docker image building tasks required for producing and pushing an
@ -20,7 +30,10 @@ import java.util.Map;
public class ImageBuilding implements Plugin<Project> {
public static final String registryName = "";
public static final String PROVIDE_TAG_FOR_BUILDING_PROPERTY = "";
public static final String PROVIDE_TAG_FOR_RUNNING_PROPERTY = "";
public DockerPushImage pushTask;
public DockerBuildImage buildTask;
public void apply(@NotNull final Project project) {
@ -43,6 +56,8 @@ public class ImageBuilding implements Plugin<Project> {
dockerBuildImage.getDockerFile().set(new File(new File("testing"), "Dockerfile"));
this.buildTask = buildDockerImageForSource;
final DockerCreateContainer createBuildContainer = project.getTasks().create("createBuildContainer", DockerCreateContainer.class,
dockerCreateContainer -> {
final File baseWorkingDir = new File(System.getProperty("") != null &&
@ -58,7 +73,7 @@ public class ImageBuilding implements Plugin<Project> {
project.getLogger().info("Will use: ${gradleDir.absolutePath} for caching gradle artifacts");
project.getLogger().info("Will use: " + gradleDir.getAbsolutePath() + " for caching gradle artifacts");
@ -103,8 +118,7 @@ public class ImageBuilding implements Plugin<Project> {
final DockerTagImage tagBuildImageResult = project.getTasks().create("tagBuildImageResult", DockerTagImage.class, dockerTagImage -> {
"${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"));
dockerTagImage.getTag().set(System.getProperty(PROVIDE_TAG_FOR_BUILDING_PROPERTY, UUID.randomUUID().toString().toLowerCase().substring(0, 12)));

View File

@ -1,22 +1,58 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.api.model.DoneablePod;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusCause;
import io.fabric8.kubernetes.api.model.StatusDetails;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import net.corda.testing.retry.Retry;
import okhttp3.Response;
import org.apache.commons.compress.utils.IOUtils;
import org.gradle.api.DefaultTask;
import org.gradle.api.tasks.TaskAction;
import org.jetbrains.annotations.NotNull;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -33,18 +69,20 @@ public class KubesTest extends DefaultTask {
String fullTaskToExecutePath;
String taskToExecuteName;
Boolean printOutput = false;
Integer numberOfCoresPerFork = 4;
Integer memoryGbPerFork = 6;
public volatile List<File> testOutput = Collections.emptyList();
public volatile List<KubePodResult> containerResults = Collections.emptyList();
String namespace = "thisisatest";
public static String NAMESPACE = "thisisatest";
int k8sTimeout = 50 * 1_000;
int webSocketTimeout = k8sTimeout * 6;
int numberOfPods = 20;
int numberOfPods = 5;
int timeoutInMinutesForPodToStart = 60;
Distribution distribution = Distribution.METHOD;
DistributeTestsBy distribution = DistributeTestsBy.METHOD;
PodLogLevel podLogLevel = PodLogLevel.INFO;
public void runDistributedTests() {
@ -54,10 +92,8 @@ public class KubesTest extends DefaultTask {
String stableRunId = rnd64Base36(new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode()));
String random = rnd64Base36(new Random());
final KubernetesClient client = getKubernetesClient();
try {
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
try (KubernetesClient client = getKubernetesClient()) {
client.pods().inNamespace(NAMESPACE).list().getItems().forEach(podToDelete -> {
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName());
@ -68,8 +104,9 @@ public class KubesTest extends DefaultTask {
List<Future<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
String podName = taskToExecuteName.toLowerCase() + "-" + stableRunId + "-" + random + "-" + i;
return submitBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
String potentialPodName = (taskToExecuteName + "-" + stableRunId + random + i).toLowerCase();
String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62));
return submitBuild(NAMESPACE, numberOfPods, i, podName, printOutput, 3);
this.testOutput = Collections.synchronizedList( -> {
@ -107,8 +144,7 @@ public class KubesTest extends DefaultTask {
private Future<KubePodResult> submitBuild(
KubernetesClient client,
private CompletableFuture<KubePodResult> submitBuild(
String namespace,
int numberOfPods,
int podIdx,
@ -116,87 +152,102 @@ public class KubesTest extends DefaultTask {
boolean printOutput,
int numberOfRetries
) {
return executorService.submit(() -> buildRunPodWithRetriesOrThrow(client, namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries));
return CompletableFuture.supplyAsync(() -> {
PersistentVolumeClaim pvc = createPvc(podName);
return buildRunPodWithRetriesOrThrow(namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries, pvc);
}, executorService);
private static void addShutdownHook(Runnable hook) {
Runtime.getRuntime().addShutdownHook(new Thread(hook));
private PersistentVolumeClaim createPvc(KubernetesClient client, String name) {
PersistentVolumeClaim pvc = client.persistentVolumeClaims()
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
private PersistentVolumeClaim createPvc(String name) {
PersistentVolumeClaim pvc;
try (KubernetesClient client = getKubernetesClient()) {
pvc = client.persistentVolumeClaims()
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
addShutdownHook(() -> {
System.out.println("Deleing PVC: " + pvc.getMetadata().getName());
try (KubernetesClient client = getKubernetesClient()) {
System.out.println("Deleting PVC: " + pvc.getMetadata().getName());
return pvc;
private KubePodResult buildRunPodWithRetriesOrThrow(
KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries
) {
int numberOfRetries,
PersistentVolumeClaim pvc) {
addShutdownHook(() -> {
System.out.println("deleting pod: " + podName);
try (KubernetesClient client = getKubernetesClient()) {
try {
// pods might die, so we retry
return Retry.fixed(numberOfRetries).run(() -> {
// remove pod if exists
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
if (oldPod.get() != null) {
getLogger().lifecycle("deleting pod: {}", podName);
while (oldPod.get() != null) {
getLogger().info("waiting for pod {} to be removed", podName);
Pod createdPod;
try (KubernetesClient client = getKubernetesClient()) {
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
if (oldPod.get() != null) {
getLogger().lifecycle("deleting pod: {}", podName);
while (oldPod.get() != null) {
getLogger().info("waiting for pod {} to be removed", podName);
getProject().getLogger().lifecycle("creating pod: " + podName);
createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName, pvc));
getProject().getLogger().lifecycle("scheduled pod: " + podName);
// recreate and run
getProject().getLogger().lifecycle("creating pod: " + podName);
Pod createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName));
getProject().getLogger().lifecycle("scheduled pod: " + podName);
attachStatusListenerToPod(client, createdPod);
waitForPodToStart(client, createdPod);
PipedOutputStream stdOutOs = new PipedOutputStream();
PipedInputStream stdOutIs = new PipedInputStream(4096);
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
CompletableFuture<Integer> waiter = new CompletableFuture<>();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter);
.exec(getBuildCommand(numberOfPods, podIdx));
File podOutput = executeBuild(namespace, numberOfPods, podIdx, podName, printOutput, stdOutOs, stdOutIs, errChannelStream, waiter);
File podOutput = startLogPumping(stdOutIs, podIdx, printOutput);
int resCode = waiter.join();
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + "), gathering results");
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, createdPod);
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + ") with result " + resCode + " , gathering results");
Collection<File> binaryResults = downloadTestXmlFromPod(namespace, createdPod);
getLogger().lifecycle("removing pod " + podName + " (" + podIdx + "/" + numberOfPods + ") after completed build");
File podLogsDirectory = new File(getProject().getBuildDir(), "pod-logs");
if (!podLogsDirectory.exists()) {
File logFileToArchive = new File(podLogsDirectory, podName + ".log");
try (FileInputStream logIn = new FileInputStream(podOutput); FileOutputStream logOut = new FileOutputStream(logFileToArchive)) {
IOUtils.copy(logIn, logOut);
try (KubernetesClient client = getKubernetesClient()) {
return new KubePodResult(resCode, podOutput, binaryResults);
} catch (Retry.RetryException e) {
@ -204,7 +255,32 @@ public class KubesTest extends DefaultTask {
private Pod buildPodRequest(String podName) {
private File executeBuild(String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
PipedOutputStream stdOutOs,
PipedInputStream stdOutIs,
ByteArrayOutputStream errChannelStream,
CompletableFuture<Integer> waiter) throws IOException {
KubernetesClient client = getKubernetesClient();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, client);
String[] buildCommand = getBuildCommand(numberOfPods, podIdx);
getProject().getLogger().quiet("About to execute " +"", (s, s2) -> s + " " + s2) + " on pod " + podName);
.exec(getBuildCommand(numberOfPods, podIdx));
return startLogPumping(stdOutIs, podIdx, printOutput);
private Pod buildPodRequest(String podName, PersistentVolumeClaim pvc) {
return new PodBuilder()
@ -217,23 +293,12 @@ public class KubesTest extends DefaultTask {
// .addNewVolume()
// .withName("testruns")
// .withNewPersistentVolumeClaim()
// .withClaimName(pvc.getMetadata().getName())
// .endPersistentVolumeClaim()
// .endVolume()
@ -283,7 +348,8 @@ public class KubesTest extends DefaultTask {
return outputFile;
private Watch attachStatusListenerToPod(KubernetesClient client, Pod pod) {
private Watch attachStatusListenerToPod(Pod pod) {
KubernetesClient client = getKubernetesClient();
return client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
public void eventReceived(Watcher.Action action, Pod resource) {
@ -292,22 +358,25 @@ public class KubesTest extends DefaultTask {
public void onClose(KubernetesClientException cause) {
private void waitForPodToStart(KubernetesClient client, Pod pod) {
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
try {
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
private void waitForPodToStart(Pod pod) {
try (KubernetesClient client = getKubernetesClient()) {
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
try {
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
private Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
String resultsInContainerPath = "/tmp/source/build/test-reports";
private Collection<File> downloadTestXmlFromPod(String namespace, Pod cp) {
String resultsInContainerPath = TEST_RUN_DIR + "/test-reports";
String binaryResultsFile = "results.bin";
String podName = cp.getMetadata().getName();
Path tempDir = new File(new File(getProject().getBuildDir(), "test-results-xml"), podName).toPath();
@ -315,25 +384,43 @@ public class KubesTest extends DefaultTask {
if (!tempDir.toFile().exists()) {
getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath());
try (KubernetesClient client = getKubernetesClient()) {
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile);
private String[] getBuildCommand(int numberOfPods, int podIdx) {
String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " +
"let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" +
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" +
String shellScript = "(let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ) && "
+ " cd /tmp/source && " +
"(let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ) && " +
"(./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " " + getLoggingLevel() + " 2>&1) ; " +
"let rs=$? ; sleep 10 ; exit ${rs}";
return new String[]{"bash", "-c", shellScript};
private String getLoggingLevel() {
switch (podLogLevel) {
case INFO:
return " --info";
case WARN:
return " --warn";
case QUIET:
return " --quiet";
case DEBUG:
return " --debug";
throw new IllegalArgumentException("LogLevel: " + podLogLevel + " is unknown");
private List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start));
List<File> folders = new ArrayList<>();
@ -344,13 +431,13 @@ public class KubesTest extends DefaultTask {
if (fileToInspect.isDirectory()) {
filesToInspect.addAll( File[]{})).collect(Collectors.toList()));
return folders;
private ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<Integer> waitingFuture) {
private ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<Integer> waitingFuture, KubernetesClient client) {
return new ExecListener() {
final Long start = System.currentTimeMillis();
@ -362,7 +449,7 @@ public class KubesTest extends DefaultTask {
public void onFailure(Throwable t, Response response) {
getProject().getLogger().lifecycle("Received error from rom pod " + podName);
getProject().getLogger().lifecycle("Received error from pod " + podName);
@ -380,6 +467,8 @@ public class KubesTest extends DefaultTask {
} catch (Exception e) {
} finally {

View File

@ -47,7 +47,7 @@ class ListTests extends DefaultTask implements TestLister {
FileCollection scanClassPath
List<String> allTests
Distribution distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? Distribution.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : Distribution.METHOD
DistributeTestsBy distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? DistributeTestsBy.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : DistributeTestsBy.METHOD
def getTestsForFork(int fork, int forks, Integer seed) {
def gitSha = new BigInteger(project.hasProperty("corda_revision") ?"corda_revision").toString() : "0", 36)
@ -66,7 +66,7 @@ class ListTests extends DefaultTask implements TestLister {
def discoverTests() {
switch (distribution) {
case Distribution.METHOD:
case DistributeTestsBy.METHOD:
Collection<String> results = new ClassGraph()
@ -85,7 +85,7 @@ class ListTests extends DefaultTask implements TestLister {
this.allTests =
case Distribution.CLASS:
case DistributeTestsBy.CLASS:
Collection<String> results = new ClassGraph()
@ -105,6 +105,6 @@ class ListTests extends DefaultTask implements TestLister {
public enum Distribution {
public enum DistributeTestsBy {

View File

@ -3,19 +3,25 @@ package net.corda.testing
import org.gradle.api.DefaultTask
class ParallelTestGroup extends DefaultTask {
Distribution distribution = Distribution.METHOD
DistributeTestsBy distribution = DistributeTestsBy.METHOD
List<String> groups = new ArrayList<>()
int shardCount = 20
int coresToUse = 4
int gbOfMemory = 4
boolean printToStdOut = true
PodLogLevel logLevel = PodLogLevel.INFO
void numberOfShards(int shards) {
this.shardCount = shards
void distribute(Distribution dist){
void podLogLevel(PodLogLevel level) {
this.logLevel = level
void distribute(DistributeTestsBy dist) {
this.distribution = dist

View File

@ -0,0 +1,135 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PodAllocator {
private static final int CONNECTION_TIMEOUT = 60_1000;
private final Logger logger;
public PodAllocator(Logger logger) {
this.logger = logger;
public PodAllocator() {
this.logger = LoggerFactory.getLogger(PodAllocator.class);
public void allocatePods(Integer number, Integer coresPerPod, Integer memoryPerPod, String prefix) {
Config config = new ConfigBuilder()
KubernetesClient client = new DefaultKubernetesClient(config);
List<Pod> podsToRequest = IntStream.range(0, number).mapToObj(i -> buildPod("pa-" + prefix + i, coresPerPod, memoryPerPod)).collect(Collectors.toList());
podsToRequest.forEach(requestedPod -> {
String msg = "PreAllocating " + requestedPod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).quiet(msg);
} else {;
public void tearDownPods(String prefix) {
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
KubernetesClient client = new DefaultKubernetesClient(config);
Stream<Pod> podsToDelete = client.pods().inNamespace(KubesTest.NAMESPACE).list()
.sorted(Comparator.comparing(p -> p.getMetadata().getName()))
.filter(foundPod -> foundPod.getMetadata().getName().contains(prefix));
List<CompletableFuture<Pod>> deleteFutures = -> {
CompletableFuture<Pod> result = new CompletableFuture<>();
Watch watch = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
public void eventReceived(Action action, Pod resource) {
if (action == Action.DELETED) {
String msg = "Successfully deleted pod " + pod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).lifecycle(msg);
} else {;
public void onClose(KubernetesClientException cause) {
String message = "Failed to delete pod " + pod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).quiet(message);
} else {;
return result;
try {
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//ignore - there's nothing left to do
Pod buildPod(String podName, Integer coresPerPod, Integer memoryPerPod) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
.withArgs("-c", "sleep 300")
.addToRequests("cpu", new Quantity(coresPerPod.toString()))
.addToRequests("memory", new Quantity(memoryPerPod.toString() + "Gi"))

View File

@ -0,0 +1,5 @@
package net.corda.testing;
public enum PodLogLevel {

View File

@ -75,13 +75,13 @@ class CordaRPCConnection private constructor(
override val serverProtocolVersion: Int get() = actualConnection.serverProtocolVersion
override fun notifyServerAndClose() = actualConnection.notifyServerAndClose()
override fun notifyServerAndClose() = doCloseLogic { actualConnection.notifyServerAndClose() }
override fun forceClose() = actualConnection.forceClose()
override fun forceClose() = doCloseLogic { actualConnection.forceClose() }
override fun close() {
private inline fun doCloseLogic(close: () -> Unit) {
try {
} finally {
observersPool?.apply {
@ -286,6 +286,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* The default value is 5.
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) {
@Suppress("unused") // constructor for java
constructor(onDisconnect: Runnable, onReconnect: Runnable, maxAttempts: Int = 5) :
this(onDisconnect = { }, onReconnect = { }, maxAttempts = maxAttempts)
@ -321,11 +322,15 @@ class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -
* If you want to enable a more graceful form of reconnection, you can make use of the gracefulReconnect argument of the [start] method.
* If this is set to true, then:
* - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or multiple addresses.
* - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been re-established and the node is up.
* - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for the subscribed [rx.Observable]s.
* - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or
* multiple addresses.
* - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been
* re-established and the node is up.
* - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for
* the subscribed [rx.Observable]s.
* Note: In this approach, some events might be lost during a re-connection and not sent in the subscribed [rx.Observable]s.
* - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing a [CouldNotStartFlowException].
* - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing
* a [CouldNotStartFlowException].
* @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour.
@ -333,7 +338,8 @@ class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, the client will round-robin from the beginning of the list and try all servers.
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s and [SerializationWhitelist]s
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s
* and [SerializationWhitelist]s
* If the created RPC client is intended to use types with custom serializers / whitelists,
* a classloader will need to be provided that contains the associated CorDapp jars.

View File

@ -25,6 +25,9 @@ interface RPCConnection<out I : RPCOps> : Closeable {
* Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources.
* If the server is not available this method may block for a short period until it's clear the server is not
* coming back.
* Note: this will also be the implementation of [close] so won't be needed when using [use] or `try-with-resources`
* blocks.
fun notifyServerAndClose()
@ -37,4 +40,6 @@ interface RPCConnection<out I : RPCOps> : Closeable {
* block waiting for it to come back, which typically happens in integration tests and demos rather than production.
fun forceClose()
override fun close() = notifyServerAndClose()

View File

@ -120,10 +120,6 @@ class RPCClient<I : RPCOps>(
override fun forceClose() {
override fun close() {
} catch (exception: Throwable) {

View File

@ -1,7 +1,18 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.*
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.*
import net.corda.client.rpc.ConnectionFailureException
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CLOSED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTING
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.DIED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.UNCONNECTED
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div
@ -89,7 +100,12 @@ class ReconnectingCordaRPCOps private constructor(
* Note that this method does not guarantee 100% that the flow will not be started twice.
fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) {
fun runFlowWithLogicalRetry(
runFlow: (CordaRPCOps) -> StateMachineRunId,
hasFlowStarted: (CordaRPCOps) -> Boolean,
onFlowConfirmed: () -> Unit = {},
timeout: Duration = 4.seconds
) {
try {
@ -149,7 +165,7 @@ class ReconnectingCordaRPCOps private constructor(
currentState = DIED
//TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.warn("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e)
@ -233,11 +249,6 @@ class ReconnectingCordaRPCOps private constructor(
currentState = CLOSED
override fun close() {
currentState = CLOSED
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
@ -267,22 +278,22 @@ class ReconnectingCordaRPCOps private constructor(
} catch (e: InvocationTargetException) {
when (e.targetException) {
is RejectedCommandException -> {
log.error("Node is being shutdown. Operation ${} rejected. Retrying when node is up...", e)
log.warn("Node is being shutdown. Operation ${} rejected. Retrying when node is up...", e)
is ConnectionFailureException -> {
log.error("Failed to perform operation ${}. Connection dropped. Retrying....", e)
log.warn("Failed to perform operation ${}. Connection dropped. Retrying....", e)
checkIfIsStartFlow(method, e)
is RPCException -> {
log.error("Failed to perform operation ${}. RPCException. Retrying....", e)
log.warn("Failed to perform operation ${}. RPCException. Retrying....", e)
Thread.sleep(1000) // TODO - explain why this sleep is necessary
checkIfIsStartFlow(method, e)
else -> {
log.error("Failed to perform operation ${}. Unknown error. Retrying....", e)
log.warn("Failed to perform operation ${}. Unknown error. Retrying....", e)
checkIfIsStartFlow(method, e)

View File

@ -161,7 +161,6 @@
<ID>ComplexMethod:IRS.kt$InterestRateSwap.FloatingLeg$override fun equals(other: Any?): Boolean</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps))</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$ private fun maybeAbbreviateGenericType(type: Type, extraRecognisedPackage: String): String</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$@JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps)</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$@JvmStatic fun runRPCFromString(input: List&lt;String&gt;, out: RenderPrintWriter, context: InvocationContext&lt;out Any&gt;, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any?</ID>
<ID>ComplexMethod:Kryo.kt$ImmutableClassSerializer$override fun read(kryo: Kryo, input: Input, type: Class&lt;T&gt;): T</ID>
<ID>ComplexMethod:Kryo.kt$ImmutableClassSerializer$override fun write(kryo: Kryo, output: Output, obj: T)</ID>
@ -392,7 +391,6 @@
<ID>ForbiddenComment:InteractiveShell.kt$// TODO: Resurrect or reimplement the mail plugin.</ID>
<ID>ForbiddenComment:InteractiveShell.kt$// TODO: Review or fix the JVM commands which have bitrotted and some are useless.</ID>
<ID>ForbiddenComment:InteractiveShell.kt$InteractiveShell$// TODO: A default renderer could be used, instead of an object mapper. See:</ID>
<ID>ForbiddenComment:InteractiveShell.kt$InteractiveShell$// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.</ID>
<ID>ForbiddenComment:InternalUtils.kt$// TODO: Add inline back when a new Kotlin version is released and check if the java.lang.VerifyError</ID>
<ID>ForbiddenComment:InternalUtils.kt$// TODO: Currently the certificate revocation status is not handled here. Nowhere in the code the second parameter is used. Consider adding the support in the future.</ID>
<ID>ForbiddenComment:IrsDemoClientApi.kt$IRSDemoClientApi$// TODO: Add uploading of files to the HTTP API</ID>
@ -1949,7 +1947,6 @@
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$error("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, $it")</ID>
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$is SchemaManagementException -&gt; throw HibernateSchemaChangeException("Incompatible schema change detected. Please run the node with database.initialiseSchema=true. Reason: ${e.message}", e)</ID>
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase?</ID>
<ID>MaxLineLength:CordaRPCClientReconnectionTest.kt$CordaRPCClientReconnectionTest$val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))</ID>
<ID>MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$, OpaqueBytes.of(1), identity), { it.resultFuture }.getOrThrow()</ID>
<ID>MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$, OpaqueBytes.of(0), identity), { it.resultFuture }.getOrThrow()</ID>
@ -1965,11 +1962,6 @@
<ID>MaxLineLength:CordaRPCOps.kt$CordaRPCOps$fun &lt;T : ContractState&gt; vaultTrackByWithSorting(contractStateType: Class&lt;out T&gt;, criteria: QueryCriteria, sorting: Sort): DataFeed&lt;Vault.Page&lt;T&gt;, Vault.Update&lt;T&gt;&gt;</ID>
<ID>MaxLineLength:CordaRPCOps.kt$StateMachineInfo$return copy(id = id, flowLogicClassName = flowLogicClassName, initiator = initiator, progressTrackerStepAndUpdates = progressTrackerStepAndUpdates, invocationContext = invocationContext)</ID>
<ID>MaxLineLength:CordaRPCOps.kt$sorting: Sort = Sort(emptySet())</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$is ConnectException -&gt; throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down")</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction?</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$return StateMachineInfo(flowLogic.runId,, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl${ error -&gt; logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$assertThatCode { rpc.startFlow(::SoftLock, cash.ref, Duration.ofSeconds(1)).returnValue.getOrThrow() }.doesNotThrowAnyException()</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$val cash = rpc.startFlow(::CashIssueFlow, 10.DOLLARS, issuerRef, notary).returnValue.getOrThrow().stx.tx.outRefsOfType&lt;Cash.State&gt;().single()</ID>
<ID>MaxLineLength:CordaSSHAuthInfo.kt$CordaSSHAuthInfo : AuthInfo</ID>
@ -3151,7 +3143,6 @@
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveStateAndRefFlow&lt;out T : ContractState&gt; : FlowLogic</ID>
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow : FlowLogic</ID>
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow$private val statesToRecord: StatesToRecord = StatesToRecord.NONE</ID>
<ID>MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps$ fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -&gt; StateMachineRunId, hasFlowStarted: (CordaRPCOps) -&gt; Boolean, onFlowConfirmed: () -&gt; Unit = {}, timeout: Duration = 4.seconds)</ID>
<ID>MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$output(, "UPDATED REF DATA", "REF DATA".output&lt;ExampleState&gt;().copy(data = "NEW STUFF!"))</ID>
<ID>MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$val stateAndRef = StateAndRef(TransactionState(state, CONTRACT_ID, DUMMY_NOTARY, constraint = AlwaysAcceptAttachmentConstraint), StateRef(SecureHash.zeroHash, 0))</ID>
<ID>MaxLineLength:ReferencedStatesFlowTests.kt$ReferencedStatesFlowTests$assertEquals(2, nodes[2].services.vaultService.queryBy&lt;LinearState&gt;(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size)</ID>
@ -3803,7 +3794,6 @@
<ID>NestedBlockDepth:FetchDataFlow.kt$FetchAttachmentsFlow$override fun maybeWriteToDisk(downloaded: List&lt;Attachment&gt;)</ID>
<ID>NestedBlockDepth:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection&lt;Predicate&gt;</ID>
<ID>NestedBlockDepth:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps))</ID>
<ID>NestedBlockDepth:InteractiveShell.kt$InteractiveShell$@JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps)</ID>
<ID>NestedBlockDepth:InternalUtils.kt$ inline fun &lt;T&gt; Iterable&lt;T&gt;.noneOrSingle(predicate: (T) -&gt; Boolean): T?</ID>
<ID>NestedBlockDepth:JarSignatureTestUtils.kt$JarSignatureTestUtils$fun Path.addManifest(fileName: String, vararg entries: Pair&lt;Attributes.Name, String&gt;)</ID>
<ID>NestedBlockDepth:Main.kt$Node$fun avalancheLoop()</ID>
@ -4575,9 +4565,6 @@
<ID>WildcardImport:CordaRPCClientTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCClientTest.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.*</ID>
<ID>WildcardImport:CordaServiceTest.kt$import kotlin.test.*</ID>
@ -4723,9 +4710,6 @@
<ID>WildcardImport:InputStreamSerializer.kt$import net.corda.serialization.internal.amqp.*</ID>
<ID>WildcardImport:InstallFactory.kt$import tornadofx.*</ID>
<ID>WildcardImport:InstallShellExtensionsParser.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import java.lang.reflect.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:InteractiveShellIntegrationTest.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:InteractiveShellIntegrationTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:InterestRatesSwapDemoAPI.kt$import org.springframework.web.bind.annotation.*</ID>
@ -4977,8 +4961,6 @@
<ID>WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:ReceiveTransactionFlow.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReconnectingCordaRPCOps.kt$import net.corda.client.rpc.*</ID>
<ID>WildcardImport:ReconnectingCordaRPCOps.kt$import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.*</ID>
<ID>WildcardImport:ReferenceInputStateTests.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReferencedStatesFlowTests.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReferencedStatesFlowTests.kt$import net.corda.core.flows.*</ID>

View File

@ -51,7 +51,7 @@ anything set earlier.
.. code-block:: none
custom = {
jvmArgs: [ '-Xmx1G', '-XX:+UseG1GC' ]
jvmArgs: [ "-Xmx1G", "-XX:+UseG1GC" ]
Note that this will completely replace any defaults set by capsule above, not just the flags that are set here, so if you use this
@ -85,7 +85,7 @@ anything set earlier.
.. code-block:: none
custom = {
jvmArgs: [ '-Xmx1G', '-XX:+UseG1GC', '-XX:-HeapDumpOnOutOfMemoryError' ]
jvmArgs: [ "-Xmx1G", "-XX:+UseG1GC", "-XX:-HeapDumpOnOutOfMemoryError" ]

View File

@ -17,16 +17,34 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.AttachmentTrustInfo
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.*
import net.corda.core.internal.sign
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.pendingFlowsCount
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeDiagnosticInfo
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
@ -120,14 +138,16 @@ internal class CordaRPCOpsImpl(
return services.vaultService._trackBy(criteria, paging, sorting, contractStateType)
@Suppress("OverridingDeprecatedMember", "DEPRECATION")
override fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction> {
val (snapshot, updates) = @Suppress("DEPRECATION") internalVerifiedTransactionsFeed()
val (snapshot, updates) = internalVerifiedTransactionsFeed()
return snapshot
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? = services.validatedTransactions.getTransaction(txnId)
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? =
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
@ -164,7 +184,8 @@ internal class CordaRPCOpsImpl(
return snapshot
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
override fun stateMachineRecordedTransactionMappingFeed():
DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return services.stateMachineRecordedTransactionMapping.track()
@ -292,7 +313,8 @@ internal class CordaRPCOpsImpl(
} catch (e: Exception) {
when (e) {
is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down")
is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes " +
"are incorrect configuration or network map service being down")
else -> throw e
@ -302,15 +324,26 @@ internal class CordaRPCOpsImpl(
return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByCriteria(
criteria: QueryCriteria,
contractStateType: Class<out T>
): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByWithPagingSpec(
contractStateType: Class<out T>,
criteria: QueryCriteria,
paging: PageSpecification
): Vault.Page<T> {
return vaultQueryBy(criteria, paging, Sort(emptySet()), contractStateType)
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByWithSorting(
contractStateType: Class<out T>,
criteria: QueryCriteria,
sorting: Sort
): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), sorting, contractStateType)
@ -318,15 +351,26 @@ internal class CordaRPCOpsImpl(
return vaultTrackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByCriteria(
contractStateType: Class<out T>,
criteria: QueryCriteria
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByWithPagingSpec(
contractStateType: Class<out T>,
criteria: QueryCriteria,
paging: PageSpecification
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, paging, Sort(emptySet()), contractStateType)
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByWithSorting(
contractStateType: Class<out T>,
criteria: QueryCriteria,
sorting: Sort
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
@ -349,7 +393,10 @@ internal class CordaRPCOpsImpl(
{ }, // Nothing to do on each update here, only completion matters.
{ error -> logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }
{ error ->
logger.error("Error while waiting for pending flows to drain in preparation for shutdown. " +
"Cause was: ${error.message}", error)
} else {
@ -375,7 +422,13 @@ internal class CordaRPCOpsImpl(
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId,, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
return StateMachineInfo(
private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {

View File

@ -109,7 +109,10 @@ class HibernateInteractionTests {
object PersistenceSchema: MappedSchema(, 1, listOf(, {
@Entity(name = "parents")
override val migrationResource: String?
get() = "hibernate-interactions-tests-schema"
@Entity(name = "parentstates")
class Parent: PersistentState() {
@ -122,7 +125,7 @@ class HibernateInteractionTests {
@Entity(name = "children")
@Entity(name = "childstates")
class Child(
// Do not change this: this generation type is required in order to trigger the proper cascade ordering.

View File

@ -0,0 +1,45 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns=""
<changeSet author="R3.Corda" id="hibernate-interaction-tests-changeset">
<createTable tableName="parentstates">
<column name="transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
<column name="output_index" type="INT">
<constraints nullable="false"/>
<addPrimaryKey columnNames="output_index, transaction_id" constraintName="parents_pkey" tableName="parentstates"/>
<createTable tableName="childstates">
<column name="identifier" type="INT" autoIncrement="true">
<constraints nullable="false"/>
<column name="member" type="NVARCHAR(255)">
<constraints nullable="false"/>
<addPrimaryKey columnNames="identifier" constraintName="children_pkey" tableName="childstates"/>
<createTable tableName="parentstates_childstates">
<column name="parentstates_output_index" type="INT">
<constraints nullable="false"/>
<column name="parentstates_transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
<column name="children_identifier" type="INT">
<constraints nullable="false"/>
<addPrimaryKey columnNames="parentstates_output_index, parentstates_transaction_id, children_identifier" constraintName="parents_children_pkey" tableName="parentstates_childstates"/>
<addForeignKeyConstraint baseColumnNames="children_identifier" baseTableName="parentstates_childstates"
referencedColumnNames="identifier" referencedTableName="childstates"/>
<addForeignKeyConstraint baseColumnNames="parentstates_output_index, parentstates_transaction_id" baseTableName="parentstates_childstates"
referencedColumnNames="output_index, transaction_id" referencedTableName="parentstates"/>

View File

@ -81,7 +81,8 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
nodeDefaults {
projectCordapp {

View File

@ -57,7 +57,6 @@ repositories {
maven {
url '' // docker-compose-rule is published on bintray
maven { url '' }
dependencies {

View File

@ -8,6 +8,7 @@ import;
import java.lang.reflect.Field;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@ -24,6 +25,19 @@ class SharedMemoryIncremental extends PortAllocation {
private int startPort;
private int endPort;
private MappedByteBuffer mb;
private Long startingAddress;
private File file = new File(System.getProperty("user.home"), "corda-" + startPort + "-to-" + endPort + "-port-allocator.bin");
private RandomAccessFile backingFile;
try {
backingFile = new RandomAccessFile(file, "rw");
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
private SharedMemoryIncremental(int startPort, int endPort) {
this.startPort = startPort;
this.endPort = endPort;
@ -35,21 +49,9 @@ class SharedMemoryIncremental extends PortAllocation {
private File file = new File(System.getProperty("user.home"), "corda-" + startPort + "-to-" + endPort + "-port-allocator.bin");
private RandomAccessFile backingFile;
try {
backingFile = new RandomAccessFile(file, "rw");
} catch (FileNotFoundException e) {
private MappedByteBuffer mb;
private Long startingAddress;
public static SharedMemoryIncremental INSTANCE = new SharedMemoryIncremental(DEFAULT_START_PORT, FIRST_EPHEMERAL_PORT);
static private Unsafe UNSAFE = getUnsafe();
static private Unsafe getUnsafe() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
@ -63,17 +65,31 @@ class SharedMemoryIncremental extends PortAllocation {
public int nextPort() {
Long oldValue;
Long newValue;
long oldValue;
long newValue;
boolean loopSuccess;
do {
oldValue = UNSAFE.getLongVolatile(null, startingAddress);
if (oldValue + 1 >= endPort || oldValue < startPort) {
newValue = Long.valueOf(startPort);
newValue = startPort;
} else {
newValue = (oldValue + 1);
} while (!UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue));
boolean reserveSuccess = UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue);
boolean portAvailable = isLocalPortAvailable(newValue);
loopSuccess = reserveSuccess && portAvailable;
} while (!loopSuccess);
return newValue.intValue();
return (int) newValue;
private boolean isLocalPortAvailable(Long portToTest) {
try (ServerSocket serverSocket = new ServerSocket(Math.toIntExact(portToTest))) {
} catch (Exception e) {
return false;
return true;

View File

@ -11,6 +11,7 @@ import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.notUsed
@ -19,11 +20,21 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.*
import net.corda.core.internal.Emoji
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.*
import net.corda.core.internal.packageName_
import net.corda.core.internal.rootCause
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.pendingFlowsCount
import org.crsh.command.InvocationContext
@ -52,12 +63,17 @@ import
import java.lang.reflect.*
import java.lang.reflect.GenericArrayType
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.lang.reflect.UndeclaredThrowableException
import java.nio.file.Path
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
// TODO: Add command history.
@ -76,11 +92,12 @@ object InteractiveShell {
private val log = LoggerFactory.getLogger(javaClass)
private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps
private lateinit var ops: InternalCordaRPCOps
private lateinit var rpcConn: AutoCloseable
private lateinit var rpcConn: CordaRPCConnection
private var shell: Shell? = null
private var classLoader: ClassLoader? = null
private lateinit var shellConfiguration: ShellConfiguration
private var onExit: () -> Unit = {}
private const val uuidStringSize = 36
fun getCordappsClassloader() = classLoader
@ -131,13 +148,41 @@ object InteractiveShell {
ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.",
ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.",
ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.",
ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'",
ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.",
ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node",
ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node",
"Commands to inspect and update the output format.",
"Runs a method from the CordaRPCOps interface on the node.",
"Commands to work with flows. Flows are how you can change the ledger.",
"An alias for 'flow start'",
"Checks if a transaction with matching Id hash exists.",
"Commands to extract information about attachments stored within the node",
"Commands to extract information about checkpoints stored within the node",
shell = ShellLifecycle(configuration.commandsDirectory).start(config, configuration.user, configuration.password, runSshDaemon)
@ -294,7 +339,12 @@ object InteractiveShell {
try {
// Show the progress tracker on the console until the flow completes or is interrupted with a
// Ctrl-C keypress.
val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper)
val stateObservable = runFlowFromString(
{ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) },
latch = CountDownLatch(1)
ansiProgressRenderer.render(stateObservable, latch::countDown)
@ -327,7 +377,8 @@ object InteractiveShell {
class NoApplicableConstructor(val errors: List<String>) : CordaException(this.toString()) {
override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator())
override fun toString() =
(listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator())
@ -375,7 +426,14 @@ object InteractiveShell {
log.error("Failed to parse flow ID", e)
//auxiliary validation - workaround for JDK8 bug
if (id.length < uuidStringSize) {
val msg = "Can not kill the flow. Flow ID of '$id' seems to be malformed - a UUID should have $uuidStringSize characters. " +
"Expand the terminal window to see the full UUID value."
if (rpcOps.killFlow(runId)) {
output.println("Killed flow $runId", Color.yellow)
} else {
@ -386,7 +444,6 @@ object InteractiveShell {
// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.
* Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts
* the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable
@ -515,7 +572,7 @@ object InteractiveShell {
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow)
return null
} else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) {
return InteractiveShell.gracefulShutdown(out, cordaRPCOps)
return gracefulShutdown(out, cordaRPCOps)
var result: Any? = null
@ -525,7 +582,7 @@ object InteractiveShell {
val call = parser.parse(cordaRPCOps, cmd)
result =
var subscription : Subscriber<*>? = null
if (result != null && result !== kotlin.Unit && result !is Void) {
if (result != null && result !== Unit && result !is Void) {
val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat)
subscription = subs
result = future
@ -568,60 +625,47 @@ object InteractiveShell {
var isShuttingDown = false
try {
display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") }
isShuttingDown = true
display {
println("Orchestrating a clean shutdown, press CTRL+C to cancel...")
println("...enabling draining mode")
println("...waiting for in-flight flows to be completed")
val latch = CountDownLatch(1)
cordaRPCOps.pendingFlowsCount().updates.doOnError { error ->
throw error
// For each update.
{ (first, second) -> display { println("...remaining: $first / $second") } },
// On error.
{ error ->
if (!isShuttingDown) {
display { println("RPC failed: ${error.rootCause}", }
// When completed.
// This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
while (!Thread.currentThread().isInterrupted) {
try {
} catch (e: InterruptedException) {
try {
display { println("...cancelled clean shutdown.") }
} finally {
} catch (e: StringToMethodCallParser.UnparseableCallException) {
display {
println("Please try 'man run' to learn what syntax is acceptable")
val subscription = cordaRPCOps.pendingFlowsCount().updates
// For each update.
{ (completed, total) -> display { println("...remaining: $completed / $total") } },
// On error.
throw it
// When completed.
// This will only show up in the standalone Shell, because the embedded one
// is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
try {
// Unsubscribe or we hold up the shutdown
} catch (e: InterruptedException) {
// Cancelled whilst draining flows. So let's carry on from here
display { println("...cancelled clean shutdown.") }
} catch (e: Exception) {
if (!isShuttingDown) {
display { println("RPC failed: ${e.rootCause}", }
display { println("RPC failed: ${e.rootCause}", }
} finally {
InputStreamSerializer.invokeContext = null