Merge pull request #5646 from corda/my_merge_branch

Merge OS 4.3 -> OS 4.4
This commit is contained in:
Stefano Franz 2019-10-30 15:46:33 +00:00 committed by GitHub
commit da61b9ed2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 871 additions and 328 deletions

View File

@ -0,0 +1,8 @@
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: testing-storage
provisioner: kubernetes.io/azure-file
parameters:
storageAccount: testrestart
location: westeurope

72
.ci/dev/smoke/Jenkinsfile vendored Normal file
View File

@ -0,0 +1,72 @@
@Library('existing-build-control')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
triggers {
issueCommentTrigger('.*smoke tests.*')
}
agent { label 'k8s' }
options { timestamps() }
environment {
DOCKER_TAG_TO_USE = "${env.GIT_COMMIT.subSequence(0, 8)}st"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
}
stages {
stage('Smoke Tests') {
steps {
script {
pullRequest.createStatus(status: 'pending',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Building',
targetUrl: "${env.JOB_URL}")
}
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-DpreAllocatePods=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelSmokeTest"
}
}
}
}
post {
always {
junit testResults: '**/build/test-results-xml/**/*.xml', allowEmptyResults: false
}
success {
script {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Passed',
targetUrl: "${env.JOB_URL}testResults")
}
}
failure {
script {
pullRequest.createStatus(status: 'failure',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Failed',
targetUrl: "${env.JOB_URL}testResults")
}
}
cleanup {
deleteDir() /* clean up our workspace */
}
}
}

View File

@ -3,14 +3,14 @@
<option name="LINE_SEPARATOR" value="&#10;" />
<option name="RIGHT_MARGIN" value="140" />
<option name="SOFT_MARGINS" value="140" />
<JavaCodeStyleSettings>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
</JavaCodeStyleSettings>
<GroovyCodeStyleSettings>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
</GroovyCodeStyleSettings>
<JavaCodeStyleSettings>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
</JavaCodeStyleSettings>
<JetCodeStyleSettings>
<option name="PACKAGES_TO_USE_STAR_IMPORTS">
<value>

View File

@ -1,5 +1,6 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

19
Jenkinsfile vendored
View File

@ -22,8 +22,8 @@ pipeline {
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace"
}
sh "kubectl auth can-i get pods"
}
@ -36,10 +36,20 @@ pipeline {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelIntegrationTest"
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace"
}
}
// stage('Unit Tests') {
// steps {
// sh "./gradlew " +
// "-DbuildId=\"\${BUILD_ID}\" " +
// "-Dkubenetize=true " +
// "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" +
// " deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace"
// }
// }
}
}
@ -47,6 +57,7 @@ pipeline {
post {
always {
archiveArtifacts artifacts: '**/pod-logs/**/*.log', fingerprint: false
junit '**/build/test-results-xml/**/*.xml'
}
cleanup {

View File

@ -1,7 +1,8 @@
import net.corda.testing.DistributedTesting
import net.corda.testing.DistributeTestsBy
import net.corda.testing.ImageBuilding
import net.corda.testing.Distribution
import net.corda.testing.ParallelTestGroup
import net.corda.testing.PodLogLevel
import static org.gradle.api.JavaVersion.VERSION_11
import static org.gradle.api.JavaVersion.VERSION_1_8
@ -28,8 +29,7 @@ buildscript {
ext.quasar_version = constants.getProperty("quasarVersion11")
ext.quasar_classifier = constants.getProperty("quasarClassifier11")
ext.jdkClassifier = constants.getProperty("jdkClassifier11")
}
else {
} else {
ext.quasar_version = constants.getProperty("quasarVersion")
ext.quasar_classifier = constants.getProperty("quasarClassifier")
ext.jdkClassifier = constants.getProperty("jdkClassifier")
@ -333,7 +333,6 @@ allprojects {
mavenCentral()
jcenter()
maven { url "$artifactory_contextUrl/corda-dependencies" }
maven { url 'https://jitpack.io' }
maven { url 'https://repo.gradle.org/gradle/libs-releases' }
}
@ -597,36 +596,22 @@ buildScan {
}
task allParallelIntegrationTest(type: ParallelTestGroup) {
podLogLevel PodLogLevel.INFO
testGroups "integrationTest"
numberOfShards 10
streamOutput false
coresPerFork 6
coresPerFork 5
memoryInGbPerFork 10
distribute Distribution.CLASS
}
task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest"
numberOfShards 4
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
}
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "smokeTest"
numberOfShards 4
streamOutput true
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.METHOD
distribute DistributeTestsBy.CLASS
}
task allParallelUnitTest(type: ParallelTestGroup) {
podLogLevel PodLogLevel.INFO
testGroups "test"
numberOfShards 10
streamOutput false
coresPerFork 5
memoryInGbPerFork 6
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
}
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest"
@ -634,7 +619,7 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
}
task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
@ -642,7 +627,15 @@ task parallelRegressionTest(type: ParallelTestGroup) {
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
}
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest", "smokeTest"
numberOfShards 4
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute DistributeTestsBy.CLASS
}
apply plugin: ImageBuilding
apply plugin: DistributedTesting

View File

@ -1,8 +1,11 @@
package net.corda.testing
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage
import org.gradle.api.GradleException
import org.gradle.api.Plugin
import org.gradle.api.Project
import org.gradle.api.Task
import org.gradle.api.tasks.testing.Test
/**
@ -18,19 +21,22 @@ class DistributedTesting implements Plugin<Project> {
void apply(Project project) {
if (System.getProperty("kubenetize") != null) {
def forks = getPropertyAsInt(project, "dockerForks", 1)
Integer forks = getPropertyAsInt(project, "dockerForks", 1)
ensureImagePluginIsApplied(project)
ImageBuilding imagePlugin = project.plugins.getPlugin(ImageBuilding)
DockerPushImage imageBuildingTask = imagePlugin.pushTask
String providedTag = System.getProperty("docker.tag")
DockerPushImage imagePushTask = imagePlugin.pushTask
DockerBuildImage imageBuildTask = imagePlugin.buildTask
String tagToUseForRunningTests = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
String tagToUseForBuilding = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
BucketingAllocatorTask globalAllocator = project.tasks.create("bucketingAllocator", BucketingAllocatorTask, forks)
def requestedTasks = project.gradle.startParameter.taskNames.collect { project.tasks.findByPath(it) }
Set<String> requestedTaskNames = project.gradle.startParameter.taskNames.toSet()
def requestedTasks = requestedTaskNames.collect { project.tasks.findByPath(it) }
//in each subproject
//1. add the task to determine all tests within the module
//2. modify the underlying testing task to use the output of the listing task to include a subset of tests for each fork
//1. add the task to determine all tests within the module and register this as a source to the global allocator
//2. modify the underlying testing task to use the output of the global allocator to include a subset of tests for each fork
//3. KubesTest will invoke these test tasks in a parallel fashion on a remote k8s cluster
//4. after each completed test write its name to a file to keep track of what finished for restart purposes
project.subprojects { Project subProject ->
@ -45,32 +51,49 @@ class DistributedTesting implements Plugin<Project> {
println "Skipping modification of ${task.getPath()} as it's not scheduled for execution"
}
if (!task.hasProperty("ignoreForDistribution")) {
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask, providedTag)
//this is what enables execution of a single test suite - for example node:parallelTest would execute all unit tests in node, node:parallelIntegrationTest would do the same for integration tests
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imagePushTask, tagToUseForRunningTests)
}
}
}
//now we are going to create "super" groupings of these KubesTest tasks, so that it is possible to invoke all submodule tests with a single command
//group all kubes tests by their underlying target task (test/integrationTest/smokeTest ... etc)
Map<String, List<KubesTest>> allKubesTestingTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() }
//now we are going to create "super" groupings of the Test tasks, so that it is possible to invoke all submodule tests with a single command
//group all test Tasks by their underlying target task (test/integrationTest/smokeTest ... etc)
Map<String, List<Test>> allTestTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() }
.flatten()
.findAll { task -> task instanceof KubesTest }
.groupBy { task -> task.taskToExecuteName }
.findAll { task -> task instanceof Test }
.groupBy { Test task -> task.name }
//first step is to create a single task which will invoke all the submodule tasks for each grouping
//ie allParallelTest will invoke [node:test, core:test, client:rpc:test ... etc]
//ie allIntegrationTest will invoke [node:integrationTest, core:integrationTest, client:rpc:integrationTest ... etc]
//ie allUnitAndIntegrationTest will invoke [node:integrationTest, node:test, core:integrationTest, core:test, client:rpc:test , client:rpc:integrationTest ... etc]
Set<ParallelTestGroup> userGroups = new HashSet<>(project.tasks.withType(ParallelTestGroup))
Collection<ParallelTestGroup> userDefinedGroups = userGroups.forEach { testGrouping ->
List<KubesTest> groups = ((ParallelTestGroup) testGrouping).groups.collect {
allKubesTestingTasksGroupedByType.get(it)
}.flatten()
String superListOfTasks = groups.collect { it.fullTaskToExecutePath }.join(" ")
userGroups.forEach { testGrouping ->
//for each "group" (ie: test, integrationTest) within the grouping find all the Test tasks which have the same name.
List<Test> groups = ((ParallelTestGroup) testGrouping).groups.collect { allTestTasksGroupedByType.get(it) }.flatten()
//join up these test tasks into a single set of tasks to invoke (node:test, node:integrationTest...)
String superListOfTasks = groups.collect { it.path }.join(" ")
//generate a preAllocate / deAllocate task which allows you to "pre-book" a node during the image building phase
//this prevents time lost to cloud provider node spin up time (assuming image build time > provider spin up time)
def (Task preAllocateTask, Task deAllocateTask) = generatePreAllocateAndDeAllocateTasksForGrouping(project, testGrouping)
//modify the image building task to depend on the preAllocate task (if specified on the command line) - this prevents gradle running out of order
if (preAllocateTask.name in requestedTaskNames) {
imageBuildTask.dependsOn preAllocateTask
}
def userDefinedParallelTask = project.rootProject.tasks.create("userDefined" + testGrouping.name.capitalize(), KubesTest) {
if (!providedTag) {
dependsOn imageBuildingTask
if (!tagToUseForRunningTests) {
dependsOn imagePushTask
}
if (deAllocateTask.name in requestedTaskNames) {
dependsOn deAllocateTask
}
numberOfPods = testGrouping.getShardCount()
printOutput = testGrouping.printToStdOut
@ -79,8 +102,9 @@ class DistributedTesting implements Plugin<Project> {
memoryGbPerFork = testGrouping.gbOfMemory
numberOfCoresPerFork = testGrouping.coresToUse
distribution = testGrouping.distribution
podLogLevel = testGrouping.logLevel
doFirst {
dockerTag = dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get())
dockerTag = tagToUseForRunningTests ? (ImageBuilding.registryName + ":" + tagToUseForRunningTests) : (imagePushTask.imageName.get() + ":" + imagePushTask.tag.get())
}
}
def reportOnAllTask = project.rootProject.tasks.create("userDefinedReports${testGrouping.name.capitalize()}", KubesReporting) {
@ -99,6 +123,38 @@ class DistributedTesting implements Plugin<Project> {
}
}
private List<Task> generatePreAllocateAndDeAllocateTasksForGrouping(Project project, ParallelTestGroup testGrouping) {
PodAllocator allocator = new PodAllocator(project.getLogger())
Task preAllocateTask = project.rootProject.tasks.create("preAllocateFor" + testGrouping.name.capitalize()) {
doFirst {
String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY)
if (dockerTag == null) {
throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY)
}
int seed = (dockerTag.hashCode() + testGrouping.name.hashCode())
String podPrefix = new BigInteger(64, new Random(seed)).toString(36)
//here we will pre-request the correct number of pods for this testGroup
int numberOfPodsToRequest = testGrouping.getShardCount()
int coresPerPod = testGrouping.getCoresToUse()
int memoryGBPerPod = testGrouping.getGbOfMemory()
allocator.allocatePods(numberOfPodsToRequest, coresPerPod, memoryGBPerPod, podPrefix)
}
}
Task deAllocateTask = project.rootProject.tasks.create("deAllocateFor" + testGrouping.name.capitalize()) {
doFirst {
String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
if (dockerTag == null) {
throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
}
int seed = (dockerTag.hashCode() + testGrouping.name.hashCode())
String podPrefix = new BigInteger(64, new Random(seed)).toString(36);
allocator.tearDownPods(podPrefix)
}
}
return [preAllocateTask, deAllocateTask]
}
private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask, String providedTag) {
def taskName = task.getName()
def capitalizedTaskName = task.getName().capitalize()
@ -120,55 +176,47 @@ class DistributedTesting implements Plugin<Project> {
private Test modifyTestTaskForParallelExecution(Project subProject, Test task, BucketingAllocatorTask globalAllocator) {
subProject.logger.info("modifying task: ${task.getPath()} to depend on task ${globalAllocator.getPath()}")
def reportsDir = new File(new File(subProject.rootProject.getBuildDir(), "test-reports"), subProject.name + "-" + task.name)
def reportsDir = new File(new File(KubesTest.TEST_RUN_DIR, "test-reports"), subProject.name + "-" + task.name)
reportsDir.mkdirs()
File executedTestsFile = new File(KubesTest.TEST_RUN_DIR + "/executedTests.txt")
task.configure {
dependsOn globalAllocator
binResultsDir new File(reportsDir, "binary")
reports.junitXml.destination new File(reportsDir, "xml")
maxHeapSize = "6g"
maxHeapSize = "10g"
doFirst {
executedTestsFile.createNewFile()
filter {
List<String> executedTests = []
File executedTestsFile = new File(KubesTest.TEST_RUN_DIR + "/executedTests.txt")
try {
executedTests = executedTestsFile.readLines()
} catch (FileNotFoundException e) {
executedTestsFile.createNewFile()
}
task.afterTest { desc, result ->
executedTestsFile.withWriterAppend { writer ->
writer.writeLine(desc.getClassName() + "." + desc.getName())
}
}
List<String> executedTests = executedTestsFile.readLines()
def fork = getPropertyAsInt(subProject, "dockerFork", 0)
subProject.logger.info("requesting tests to include in testing task ${task.getPath()} (idx: ${fork})")
List<String> includes = globalAllocator.getTestIncludesForForkAndTestTask(
fork,
task)
subProject.logger.info "got ${includes.size()} tests to include into testing task ${task.getPath()}"
if (includes.size() == 0) {
subProject.logger.info "Disabling test execution for testing task ${task.getPath()}"
excludeTestsMatching "*"
}
includes.removeAll(executedTests)
executedTests.forEach { exclude ->
subProject.logger.info "excluding: $exclude for testing task ${task.getPath()}"
excludeTestsMatching exclude
}
includes.forEach { include ->
subProject.logger.info "including: $include for testing task ${task.getPath()}"
includeTestsMatching include
}
failOnNoMatchingTests false
}
}
afterTest { desc, result ->
executedTestsFile.withWriterAppend { writer ->
writer.writeLine(desc.getClassName() + "." + desc.getName())
}
}
}
return task

View File

@ -1,8 +1,17 @@
package net.corda.testing;
import com.bmuschko.gradle.docker.DockerRegistryCredentials;
import com.bmuschko.gradle.docker.tasks.container.*;
import com.bmuschko.gradle.docker.tasks.image.*;
import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerLogsContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerWaitContainer;
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage;
import com.bmuschko.gradle.docker.tasks.image.DockerCommitImage;
import com.bmuschko.gradle.docker.tasks.image.DockerPullImage;
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage;
import com.bmuschko.gradle.docker.tasks.image.DockerRemoveImage;
import com.bmuschko.gradle.docker.tasks.image.DockerTagImage;
import org.gradle.api.GradleException;
import org.gradle.api.Plugin;
import org.gradle.api.Project;
@ -12,6 +21,7 @@ import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* this plugin is responsible for setting up all the required docker image building tasks required for producing and pushing an
@ -20,7 +30,10 @@ import java.util.Map;
public class ImageBuilding implements Plugin<Project> {
public static final String registryName = "stefanotestingcr.azurecr.io/testing";
public static final String PROVIDE_TAG_FOR_BUILDING_PROPERTY = "docker.build.tag";
public static final String PROVIDE_TAG_FOR_RUNNING_PROPERTY = "docker.run.tag";
public DockerPushImage pushTask;
public DockerBuildImage buildTask;
@Override
public void apply(@NotNull final Project project) {
@ -43,6 +56,8 @@ public class ImageBuilding implements Plugin<Project> {
dockerBuildImage.getDockerFile().set(new File(new File("testing"), "Dockerfile"));
});
this.buildTask = buildDockerImageForSource;
final DockerCreateContainer createBuildContainer = project.getTasks().create("createBuildContainer", DockerCreateContainer.class,
dockerCreateContainer -> {
final File baseWorkingDir = new File(System.getProperty("docker.work.dir") != null &&
@ -58,7 +73,7 @@ public class ImageBuilding implements Plugin<Project> {
mavenDir.mkdirs();
}
project.getLogger().info("Will use: ${gradleDir.absolutePath} for caching gradle artifacts");
project.getLogger().info("Will use: " + gradleDir.getAbsolutePath() + " for caching gradle artifacts");
});
dockerCreateContainer.dependsOn(buildDockerImageForSource);
dockerCreateContainer.targetImageId(buildDockerImageForSource.getImageId());
@ -103,8 +118,7 @@ public class ImageBuilding implements Plugin<Project> {
final DockerTagImage tagBuildImageResult = project.getTasks().create("tagBuildImageResult", DockerTagImage.class, dockerTagImage -> {
dockerTagImage.dependsOn(commitBuildImageResult);
dockerTagImage.getImageId().set(commitBuildImageResult.getImageId());
dockerTagImage.getTag().set(System.getProperty("docker.provided.tag",
"${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"));
dockerTagImage.getTag().set(System.getProperty(PROVIDE_TAG_FOR_BUILDING_PROPERTY, UUID.randomUUID().toString().toLowerCase().substring(0, 12)));
dockerTagImage.getRepository().set(registryName);
});

View File

@ -1,22 +1,58 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.api.model.DoneablePod;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusCause;
import io.fabric8.kubernetes.api.model.StatusDetails;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import net.corda.testing.retry.Retry;
import okhttp3.Response;
import org.apache.commons.compress.utils.IOUtils;
import org.gradle.api.DefaultTask;
import org.gradle.api.tasks.TaskAction;
import org.jetbrains.annotations.NotNull;
import java.io.*;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -33,18 +69,20 @@ public class KubesTest extends DefaultTask {
String fullTaskToExecutePath;
String taskToExecuteName;
Boolean printOutput = false;
Integer numberOfCoresPerFork = 4;
Integer memoryGbPerFork = 6;
public volatile List<File> testOutput = Collections.emptyList();
public volatile List<KubePodResult> containerResults = Collections.emptyList();
String namespace = "thisisatest";
public static String NAMESPACE = "thisisatest";
int k8sTimeout = 50 * 1_000;
int webSocketTimeout = k8sTimeout * 6;
int numberOfPods = 20;
int numberOfPods = 5;
int timeoutInMinutesForPodToStart = 60;
Distribution distribution = Distribution.METHOD;
DistributeTestsBy distribution = DistributeTestsBy.METHOD;
PodLogLevel podLogLevel = PodLogLevel.INFO;
@TaskAction
public void runDistributedTests() {
@ -54,10 +92,8 @@ public class KubesTest extends DefaultTask {
String stableRunId = rnd64Base36(new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode()));
String random = rnd64Base36(new Random());
final KubernetesClient client = getKubernetesClient();
try {
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
try (KubernetesClient client = getKubernetesClient()) {
client.pods().inNamespace(NAMESPACE).list().getItems().forEach(podToDelete -> {
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName());
client.resource(podToDelete).delete();
@ -68,8 +104,9 @@ public class KubesTest extends DefaultTask {
}
List<Future<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
String podName = taskToExecuteName.toLowerCase() + "-" + stableRunId + "-" + random + "-" + i;
return submitBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
String potentialPodName = (taskToExecuteName + "-" + stableRunId + random + i).toLowerCase();
String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62));
return submitBuild(NAMESPACE, numberOfPods, i, podName, printOutput, 3);
}).collect(Collectors.toList());
this.testOutput = Collections.synchronizedList(futures.stream().map(it -> {
@ -107,8 +144,7 @@ public class KubesTest extends DefaultTask {
.toLowerCase();
}
private Future<KubePodResult> submitBuild(
KubernetesClient client,
private CompletableFuture<KubePodResult> submitBuild(
String namespace,
int numberOfPods,
int podIdx,
@ -116,87 +152,102 @@ public class KubesTest extends DefaultTask {
boolean printOutput,
int numberOfRetries
) {
return executorService.submit(() -> buildRunPodWithRetriesOrThrow(client, namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries));
return CompletableFuture.supplyAsync(() -> {
PersistentVolumeClaim pvc = createPvc(podName);
return buildRunPodWithRetriesOrThrow(namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries, pvc);
}, executorService);
}
private static void addShutdownHook(Runnable hook) {
Runtime.getRuntime().addShutdownHook(new Thread(hook));
}
private PersistentVolumeClaim createPvc(KubernetesClient client, String name) {
PersistentVolumeClaim pvc = client.persistentVolumeClaims()
.inNamespace(namespace)
.createNew()
.editOrNewMetadata().withName(name).endMetadata()
.editOrNewSpec()
.withAccessModes("ReadWriteOnce")
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
.endSpec()
.done();
private PersistentVolumeClaim createPvc(String name) {
PersistentVolumeClaim pvc;
try (KubernetesClient client = getKubernetesClient()) {
pvc = client.persistentVolumeClaims()
.inNamespace(NAMESPACE)
.createNew()
.editOrNewMetadata().withName(name).endMetadata()
.editOrNewSpec()
.withAccessModes("ReadWriteOnce")
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
.withStorageClassName("testing-storage")
.endSpec()
.done();
}
addShutdownHook(() -> {
System.out.println("Deleing PVC: " + pvc.getMetadata().getName());
client.persistentVolumeClaims().delete(pvc);
try (KubernetesClient client = getKubernetesClient()) {
System.out.println("Deleting PVC: " + pvc.getMetadata().getName());
client.persistentVolumeClaims().delete(pvc);
}
});
return pvc;
}
private KubePodResult buildRunPodWithRetriesOrThrow(
KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries
) {
int numberOfRetries,
PersistentVolumeClaim pvc) {
addShutdownHook(() -> {
System.out.println("deleting pod: " + podName);
client.pods().inNamespace(namespace).withName(podName).delete();
try (KubernetesClient client = getKubernetesClient()) {
client.pods().inNamespace(namespace).withName(podName).delete();
}
});
try {
// pods might die, so we retry
return Retry.fixed(numberOfRetries).run(() -> {
// remove pod if exists
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
if (oldPod.get() != null) {
getLogger().lifecycle("deleting pod: {}", podName);
oldPod.delete();
while (oldPod.get() != null) {
getLogger().info("waiting for pod {} to be removed", podName);
Thread.sleep(1000);
Pod createdPod;
try (KubernetesClient client = getKubernetesClient()) {
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
if (oldPod.get() != null) {
getLogger().lifecycle("deleting pod: {}", podName);
oldPod.delete();
while (oldPod.get() != null) {
getLogger().info("waiting for pod {} to be removed", podName);
Thread.sleep(1000);
}
}
getProject().getLogger().lifecycle("creating pod: " + podName);
createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName, pvc));
getProject().getLogger().lifecycle("scheduled pod: " + podName);
}
// recreate and run
getProject().getLogger().lifecycle("creating pod: " + podName);
Pod createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName));
getProject().getLogger().lifecycle("scheduled pod: " + podName);
attachStatusListenerToPod(client, createdPod);
waitForPodToStart(client, createdPod);
attachStatusListenerToPod(createdPod);
waitForPodToStart(createdPod);
PipedOutputStream stdOutOs = new PipedOutputStream();
PipedInputStream stdOutIs = new PipedInputStream(4096);
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
CompletableFuture<Integer> waiter = new CompletableFuture<>();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter);
stdOutIs.connect(stdOutOs);
client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(execListener)
.exec(getBuildCommand(numberOfPods, podIdx));
File podOutput = executeBuild(namespace, numberOfPods, podIdx, podName, printOutput, stdOutOs, stdOutIs, errChannelStream, waiter);
File podOutput = startLogPumping(stdOutIs, podIdx, printOutput);
int resCode = waiter.join();
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + "), gathering results");
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, createdPod);
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + ") with result " + resCode + " , gathering results");
Collection<File> binaryResults = downloadTestXmlFromPod(namespace, createdPod);
getLogger().lifecycle("removing pod " + podName + " (" + podIdx + "/" + numberOfPods + ") after completed build");
File podLogsDirectory = new File(getProject().getBuildDir(), "pod-logs");
if (!podLogsDirectory.exists()) {
podLogsDirectory.mkdirs();
}
File logFileToArchive = new File(podLogsDirectory, podName + ".log");
try (FileInputStream logIn = new FileInputStream(podOutput); FileOutputStream logOut = new FileOutputStream(logFileToArchive)) {
IOUtils.copy(logIn, logOut);
}
try (KubernetesClient client = getKubernetesClient()) {
client.pods().delete(createdPod);
client.persistentVolumeClaims().delete(pvc);
}
return new KubePodResult(resCode, podOutput, binaryResults);
});
} catch (Retry.RetryException e) {
@ -204,7 +255,32 @@ public class KubesTest extends DefaultTask {
}
}
private Pod buildPodRequest(String podName) {
@NotNull
private File executeBuild(String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
PipedOutputStream stdOutOs,
PipedInputStream stdOutIs,
ByteArrayOutputStream errChannelStream,
CompletableFuture<Integer> waiter) throws IOException {
KubernetesClient client = getKubernetesClient();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, client);
stdOutIs.connect(stdOutOs);
String[] buildCommand = getBuildCommand(numberOfPods, podIdx);
getProject().getLogger().quiet("About to execute " + Arrays.stream(buildCommand).reduce("", (s, s2) -> s + " " + s2) + " on pod " + podName);
client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(execListener)
.exec(getBuildCommand(numberOfPods, podIdx));
return startLogPumping(stdOutIs, podIdx, printOutput);
}
private Pod buildPodRequest(String podName, PersistentVolumeClaim pvc) {
return new PodBuilder()
.withNewMetadata().withName(podName).endMetadata()
@ -217,23 +293,12 @@ public class KubesTest extends DefaultTask {
.withPath("/tmp/gradle")
.endHostPath()
.endVolume()
.addNewVolume()
.withName("testruns")
.withNewHostPath()
.withType("DirectoryOrCreate")
.withPath("/tmp/testruns")
.endHostPath()
.withNewPersistentVolumeClaim()
.withClaimName(pvc.getMetadata().getName())
.endPersistentVolumeClaim()
.endVolume()
// .addNewVolume()
// .withName("testruns")
// .withNewPersistentVolumeClaim()
// .withClaimName(pvc.getMetadata().getName())
// .endPersistentVolumeClaim()
// .endVolume()
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
@ -283,7 +348,8 @@ public class KubesTest extends DefaultTask {
return outputFile;
}
private Watch attachStatusListenerToPod(KubernetesClient client, Pod pod) {
private Watch attachStatusListenerToPod(Pod pod) {
KubernetesClient client = getKubernetesClient();
return client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
@Override
public void eventReceived(Watcher.Action action, Pod resource) {
@ -292,22 +358,25 @@ public class KubesTest extends DefaultTask {
@Override
public void onClose(KubernetesClientException cause) {
client.close();
}
});
}
private void waitForPodToStart(KubernetesClient client, Pod pod) {
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
try {
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
private void waitForPodToStart(Pod pod) {
try (KubernetesClient client = getKubernetesClient()) {
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
try {
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
}
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
}
private Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
String resultsInContainerPath = "/tmp/source/build/test-reports";
private Collection<File> downloadTestXmlFromPod(String namespace, Pod cp) {
String resultsInContainerPath = TEST_RUN_DIR + "/test-reports";
String binaryResultsFile = "results.bin";
String podName = cp.getMetadata().getName();
Path tempDir = new File(new File(getProject().getBuildDir(), "test-results-xml"), podName).toPath();
@ -315,25 +384,43 @@ public class KubesTest extends DefaultTask {
if (!tempDir.toFile().exists()) {
tempDir.toFile().mkdirs();
}
getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath());
client.pods()
.inNamespace(namespace)
.withName(podName)
.dir(resultsInContainerPath)
.copy(tempDir);
try (KubernetesClient client = getKubernetesClient()) {
client.pods()
.inNamespace(namespace)
.withName(podName)
.dir(resultsInContainerPath)
.copy(tempDir);
}
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile);
}
private String[] getBuildCommand(int numberOfPods, int podIdx) {
String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " +
"let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" +
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" +
String shellScript = "(let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ) && "
+ " cd /tmp/source && " +
"(let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ) && " +
"(./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " " + getLoggingLevel() + " 2>&1) ; " +
"let rs=$? ; sleep 10 ; exit ${rs}";
return new String[]{"bash", "-c", shellScript};
}
private String getLoggingLevel() {
switch (podLogLevel) {
case INFO:
return " --info";
case WARN:
return " --warn";
case QUIET:
return " --quiet";
case DEBUG:
return " --debug";
default:
throw new IllegalArgumentException("LogLevel: " + podLogLevel + " is unknown");
}
}
private List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start));
List<File> folders = new ArrayList<>();
@ -344,13 +431,13 @@ public class KubesTest extends DefaultTask {
}
if (fileToInspect.isDirectory()) {
filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList()));
filesToInspect.addAll(Arrays.stream(Optional.ofNullable(fileToInspect.listFiles()).orElse(new File[]{})).collect(Collectors.toList()));
}
}
return folders;
}
private ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<Integer> waitingFuture) {
private ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<Integer> waitingFuture, KubernetesClient client) {
return new ExecListener() {
final Long start = System.currentTimeMillis();
@ -362,7 +449,7 @@ public class KubesTest extends DefaultTask {
@Override
public void onFailure(Throwable t, Response response) {
getProject().getLogger().lifecycle("Received error from rom pod " + podName);
getProject().getLogger().lifecycle("Received error from pod " + podName);
waitingFuture.completeExceptionally(t);
}
@ -380,6 +467,8 @@ public class KubesTest extends DefaultTask {
waitingFuture.complete(resultCode);
} catch (Exception e) {
waitingFuture.completeExceptionally(e);
} finally {
client.close();
}
}
};

View File

@ -47,7 +47,7 @@ class ListTests extends DefaultTask implements TestLister {
FileCollection scanClassPath
List<String> allTests
Distribution distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? Distribution.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : Distribution.METHOD
DistributeTestsBy distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? DistributeTestsBy.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : DistributeTestsBy.METHOD
def getTestsForFork(int fork, int forks, Integer seed) {
def gitSha = new BigInteger(project.hasProperty("corda_revision") ? project.property("corda_revision").toString() : "0", 36)
@ -66,7 +66,7 @@ class ListTests extends DefaultTask implements TestLister {
@TaskAction
def discoverTests() {
switch (distribution) {
case Distribution.METHOD:
case DistributeTestsBy.METHOD:
Collection<String> results = new ClassGraph()
.enableClassInfo()
.enableMethodInfo()
@ -85,7 +85,7 @@ class ListTests extends DefaultTask implements TestLister {
this.allTests = results.stream().sorted().collect(Collectors.toList())
break
case Distribution.CLASS:
case DistributeTestsBy.CLASS:
Collection<String> results = new ClassGraph()
.enableClassInfo()
.enableMethodInfo()
@ -105,6 +105,6 @@ class ListTests extends DefaultTask implements TestLister {
}
}
public enum Distribution {
public enum DistributeTestsBy {
CLASS, METHOD
}

View File

@ -3,19 +3,25 @@ package net.corda.testing
import org.gradle.api.DefaultTask
class ParallelTestGroup extends DefaultTask {
Distribution distribution = Distribution.METHOD
DistributeTestsBy distribution = DistributeTestsBy.METHOD
List<String> groups = new ArrayList<>()
int shardCount = 20
int coresToUse = 4
int gbOfMemory = 4
boolean printToStdOut = true
PodLogLevel logLevel = PodLogLevel.INFO
void numberOfShards(int shards) {
this.shardCount = shards
}
void distribute(Distribution dist){
void podLogLevel(PodLogLevel level) {
this.logLevel = level
}
void distribute(DistributeTestsBy dist) {
this.distribution = dist
}

View File

@ -0,0 +1,135 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class PodAllocator {
private static final int CONNECTION_TIMEOUT = 60_1000;
private final Logger logger;
public PodAllocator(Logger logger) {
this.logger = logger;
}
public PodAllocator() {
this.logger = LoggerFactory.getLogger(PodAllocator.class);
}
public void allocatePods(Integer number, Integer coresPerPod, Integer memoryPerPod, String prefix) {
Config config = new ConfigBuilder()
.withConnectionTimeout(CONNECTION_TIMEOUT)
.withRequestTimeout(CONNECTION_TIMEOUT)
.withRollingTimeout(CONNECTION_TIMEOUT)
.withWebsocketTimeout(CONNECTION_TIMEOUT)
.withWebsocketPingInterval(CONNECTION_TIMEOUT)
.build();
KubernetesClient client = new DefaultKubernetesClient(config);
List<Pod> podsToRequest = IntStream.range(0, number).mapToObj(i -> buildPod("pa-" + prefix + i, coresPerPod, memoryPerPod)).collect(Collectors.toList());
podsToRequest.forEach(requestedPod -> {
String msg = "PreAllocating " + requestedPod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).quiet(msg);
} else {
logger.info(msg);
}
client.pods().inNamespace(KubesTest.NAMESPACE).create(requestedPod);
});
}
public void tearDownPods(String prefix) {
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
.withConnectionTimeout(CONNECTION_TIMEOUT)
.withRequestTimeout(CONNECTION_TIMEOUT)
.withRollingTimeout(CONNECTION_TIMEOUT)
.withWebsocketTimeout(CONNECTION_TIMEOUT)
.withWebsocketPingInterval(CONNECTION_TIMEOUT)
.build();
KubernetesClient client = new DefaultKubernetesClient(config);
Stream<Pod> podsToDelete = client.pods().inNamespace(KubesTest.NAMESPACE).list()
.getItems()
.stream()
.sorted(Comparator.comparing(p -> p.getMetadata().getName()))
.filter(foundPod -> foundPod.getMetadata().getName().contains(prefix));
List<CompletableFuture<Pod>> deleteFutures = podsToDelete.map(pod -> {
CompletableFuture<Pod> result = new CompletableFuture<>();
Watch watch = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
if (action == Action.DELETED) {
result.complete(resource);
String msg = "Successfully deleted pod " + pod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).lifecycle(msg);
} else {
logger.info(msg);
}
}
}
@Override
public void onClose(KubernetesClientException cause) {
String message = "Failed to delete pod " + pod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).quiet(message);
} else {
logger.info(message);
}
result.completeExceptionally(cause);
}
});
client.pods().delete(pod);
return result;
}).collect(Collectors.toList());
try {
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//ignore - there's nothing left to do
}
}
Pod buildPod(String podName, Integer coresPerPod, Integer memoryPerPod) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewContainer()
.withImage("busybox:latest")
.withCommand("sh")
.withArgs("-c", "sleep 300")
.withName(podName)
.withNewResources()
.addToRequests("cpu", new Quantity(coresPerPod.toString()))
.addToRequests("memory", new Quantity(memoryPerPod.toString() + "Gi"))
.endResources()
.endContainer()
.withRestartPolicy("Never")
.endSpec()
.build();
}
}

View File

@ -0,0 +1,5 @@
package net.corda.testing;
public enum PodLogLevel {
QUIET, WARN, INFO, DEBUG
}

View File

@ -75,13 +75,13 @@ class CordaRPCConnection private constructor(
override val serverProtocolVersion: Int get() = actualConnection.serverProtocolVersion
override fun notifyServerAndClose() = actualConnection.notifyServerAndClose()
override fun notifyServerAndClose() = doCloseLogic { actualConnection.notifyServerAndClose() }
override fun forceClose() = actualConnection.forceClose()
override fun forceClose() = doCloseLogic { actualConnection.forceClose() }
override fun close() {
private inline fun doCloseLogic(close: () -> Unit) {
try {
actualConnection.close()
close.invoke()
} finally {
observersPool?.apply {
shutdown()
@ -286,6 +286,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* The default value is 5.
*/
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) {
@Suppress("unused") // constructor for java
@JvmOverloads
constructor(onDisconnect: Runnable, onReconnect: Runnable, maxAttempts: Int = 5) :
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxAttempts = maxAttempts)
@ -321,11 +322,15 @@ class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -
*
* If you want to enable a more graceful form of reconnection, you can make use of the gracefulReconnect argument of the [start] method.
* If this is set to true, then:
* - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or multiple addresses.
* - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been re-established and the node is up.
* - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for the subscribed [rx.Observable]s.
* - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or
* multiple addresses.
* - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been
* re-established and the node is up.
* - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for
* the subscribed [rx.Observable]s.
* Note: In this approach, some events might be lost during a re-connection and not sent in the subscribed [rx.Observable]s.
* - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing a [CouldNotStartFlowException].
* - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing
* a [CouldNotStartFlowException].
*
* @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour.
@ -333,7 +338,8 @@ class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, the client will round-robin from the beginning of the list and try all servers.
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s and [SerializationWhitelist]s
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s
* and [SerializationWhitelist]s
* If the created RPC client is intended to use types with custom serializers / whitelists,
* a classloader will need to be provided that contains the associated CorDapp jars.
*/

View File

@ -25,6 +25,9 @@ interface RPCConnection<out I : RPCOps> : Closeable {
* Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources.
* If the server is not available this method may block for a short period until it's clear the server is not
* coming back.
*
* Note: this will also be the implementation of [close] so won't be needed when using [use] or `try-with-resources`
* blocks.
*/
fun notifyServerAndClose()
@ -37,4 +40,6 @@ interface RPCConnection<out I : RPCOps> : Closeable {
* block waiting for it to come back, which typically happens in integration tests and demos rather than production.
*/
fun forceClose()
override fun close() = notifyServerAndClose()
}

View File

@ -120,10 +120,6 @@ class RPCClient<I : RPCOps>(
override fun forceClose() {
close(false)
}
override fun close() {
close(true)
}
}
} catch (exception: Throwable) {
proxyHandler.notifyServerAndClose()

View File

@ -1,7 +1,18 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.*
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.*
import net.corda.client.rpc.ConnectionFailureException
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CLOSED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTING
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.DIED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.UNCONNECTED
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div
@ -89,7 +100,12 @@ class ReconnectingCordaRPCOps private constructor(
*
* Note that this method does not guarantee 100% that the flow will not be started twice.
*/
fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) {
fun runFlowWithLogicalRetry(
runFlow: (CordaRPCOps) -> StateMachineRunId,
hasFlowStarted: (CordaRPCOps) -> Boolean,
onFlowConfirmed: () -> Unit = {},
timeout: Duration = 4.seconds
) {
try {
runFlow(this)
onFlowConfirmed()
@ -149,7 +165,7 @@ class ReconnectingCordaRPCOps private constructor(
currentState = DIED
gracefulReconnect.onDisconnect.invoke()
//TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.warn("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e)
connect()
previousConnection?.forceClose()
@ -233,11 +249,6 @@ class ReconnectingCordaRPCOps private constructor(
currentState = CLOSED
currentRPCConnection?.forceClose()
}
@Synchronized
override fun close() {
currentState = CLOSED
currentRPCConnection?.close()
}
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
@ -267,22 +278,22 @@ class ReconnectingCordaRPCOps private constructor(
} catch (e: InvocationTargetException) {
when (e.targetException) {
is RejectedCommandException -> {
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
log.warn("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
reconnectingRPCConnection.reconnectOnError(e)
}
is ConnectionFailureException -> {
log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
log.warn("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
checkIfIsStartFlow(method, e)
}
is RPCException -> {
log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
log.warn("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
Thread.sleep(1000) // TODO - explain why this sleep is necessary
checkIfIsStartFlow(method, e)
}
else -> {
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
log.warn("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
checkIfIsStartFlow(method, e)
}

View File

@ -161,7 +161,6 @@
<ID>ComplexMethod:IRS.kt$InterestRateSwap.FloatingLeg$override fun equals(other: Any?): Boolean</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps))</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$ private fun maybeAbbreviateGenericType(type: Type, extraRecognisedPackage: String): String</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$@JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps)</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$@JvmStatic fun runRPCFromString(input: List&lt;String&gt;, out: RenderPrintWriter, context: InvocationContext&lt;out Any&gt;, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any?</ID>
<ID>ComplexMethod:Kryo.kt$ImmutableClassSerializer$override fun read(kryo: Kryo, input: Input, type: Class&lt;T&gt;): T</ID>
<ID>ComplexMethod:Kryo.kt$ImmutableClassSerializer$override fun write(kryo: Kryo, output: Output, obj: T)</ID>
@ -392,7 +391,6 @@
<ID>ForbiddenComment:InteractiveShell.kt$// TODO: Resurrect or reimplement the mail plugin.</ID>
<ID>ForbiddenComment:InteractiveShell.kt$// TODO: Review or fix the JVM commands which have bitrotted and some are useless.</ID>
<ID>ForbiddenComment:InteractiveShell.kt$InteractiveShell$// TODO: A default renderer could be used, instead of an object mapper. See: http://www.crashub.org/1.3/reference.html#_renderers</ID>
<ID>ForbiddenComment:InteractiveShell.kt$InteractiveShell$// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.</ID>
<ID>ForbiddenComment:InternalUtils.kt$// TODO: Add inline back when a new Kotlin version is released and check if the java.lang.VerifyError</ID>
<ID>ForbiddenComment:InternalUtils.kt$// TODO: Currently the certificate revocation status is not handled here. Nowhere in the code the second parameter is used. Consider adding the support in the future.</ID>
<ID>ForbiddenComment:IrsDemoClientApi.kt$IRSDemoClientApi$// TODO: Add uploading of files to the HTTP API</ID>
@ -1949,7 +1947,6 @@
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$error("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, $it")</ID>
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$is SchemaManagementException -&gt; throw HibernateSchemaChangeException("Incompatible schema change detected. Please run the node with database.initialiseSchema=true. Reason: ${e.message}", e)</ID>
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase?</ID>
<ID>MaxLineLength:CordaRPCClient.kt$CordaRPCClient</ID>
<ID>MaxLineLength:CordaRPCClientReconnectionTest.kt$CordaRPCClientReconnectionTest$val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))</ID>
<ID>MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$node.services.startFlow(CashIssueFlow(100.POUNDS, OpaqueBytes.of(1), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()</ID>
<ID>MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()</ID>
@ -1965,11 +1962,6 @@
<ID>MaxLineLength:CordaRPCOps.kt$CordaRPCOps$fun &lt;T : ContractState&gt; vaultTrackByWithSorting(contractStateType: Class&lt;out T&gt;, criteria: QueryCriteria, sorting: Sort): DataFeed&lt;Vault.Page&lt;T&gt;, Vault.Update&lt;T&gt;&gt;</ID>
<ID>MaxLineLength:CordaRPCOps.kt$StateMachineInfo$return copy(id = id, flowLogicClassName = flowLogicClassName, initiator = initiator, progressTrackerStepAndUpdates = progressTrackerStepAndUpdates, invocationContext = invocationContext)</ID>
<ID>MaxLineLength:CordaRPCOps.kt$sorting: Sort = Sort(emptySet())</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$is ConnectException -&gt; throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down")</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$override</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction?</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl${ error -&gt; logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$assertThatCode { rpc.startFlow(::SoftLock, cash.ref, Duration.ofSeconds(1)).returnValue.getOrThrow() }.doesNotThrowAnyException()</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$val cash = rpc.startFlow(::CashIssueFlow, 10.DOLLARS, issuerRef, notary).returnValue.getOrThrow().stx.tx.outRefsOfType&lt;Cash.State&gt;().single()</ID>
<ID>MaxLineLength:CordaSSHAuthInfo.kt$CordaSSHAuthInfo : AuthInfo</ID>
@ -3151,7 +3143,6 @@
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveStateAndRefFlow&lt;out T : ContractState&gt; : FlowLogic</ID>
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow : FlowLogic</ID>
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow$private val statesToRecord: StatesToRecord = StatesToRecord.NONE</ID>
<ID>MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps$ fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -&gt; StateMachineRunId, hasFlowStarted: (CordaRPCOps) -&gt; Boolean, onFlowConfirmed: () -&gt; Unit = {}, timeout: Duration = 4.seconds)</ID>
<ID>MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$output(ExampleContract::class.java.typeName, "UPDATED REF DATA", "REF DATA".output&lt;ExampleState&gt;().copy(data = "NEW STUFF!"))</ID>
<ID>MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$val stateAndRef = StateAndRef(TransactionState(state, CONTRACT_ID, DUMMY_NOTARY, constraint = AlwaysAcceptAttachmentConstraint), StateRef(SecureHash.zeroHash, 0))</ID>
<ID>MaxLineLength:ReferencedStatesFlowTests.kt$ReferencedStatesFlowTests$assertEquals(2, nodes[2].services.vaultService.queryBy&lt;LinearState&gt;(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size)</ID>
@ -3803,7 +3794,6 @@
<ID>NestedBlockDepth:FetchDataFlow.kt$FetchAttachmentsFlow$override fun maybeWriteToDisk(downloaded: List&lt;Attachment&gt;)</ID>
<ID>NestedBlockDepth:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection&lt;Predicate&gt;</ID>
<ID>NestedBlockDepth:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps))</ID>
<ID>NestedBlockDepth:InteractiveShell.kt$InteractiveShell$@JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps)</ID>
<ID>NestedBlockDepth:InternalUtils.kt$ inline fun &lt;T&gt; Iterable&lt;T&gt;.noneOrSingle(predicate: (T) -&gt; Boolean): T?</ID>
<ID>NestedBlockDepth:JarSignatureTestUtils.kt$JarSignatureTestUtils$fun Path.addManifest(fileName: String, vararg entries: Pair&lt;Attributes.Name, String&gt;)</ID>
<ID>NestedBlockDepth:Main.kt$Node$fun avalancheLoop()</ID>
@ -4575,9 +4565,6 @@
<ID>WildcardImport:CordaRPCClientTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCClientTest.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:CordaRPCOps.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.*</ID>
<ID>WildcardImport:CordaServiceTest.kt$import kotlin.test.*</ID>
@ -4723,9 +4710,6 @@
<ID>WildcardImport:InputStreamSerializer.kt$import net.corda.serialization.internal.amqp.*</ID>
<ID>WildcardImport:InstallFactory.kt$import tornadofx.*</ID>
<ID>WildcardImport:InstallShellExtensionsParser.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import java.lang.reflect.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:InteractiveShellIntegrationTest.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:InteractiveShellIntegrationTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:InterestRatesSwapDemoAPI.kt$import org.springframework.web.bind.annotation.*</ID>
@ -4977,8 +4961,6 @@
<ID>WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.*</ID>
<ID>WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:ReceiveTransactionFlow.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReconnectingCordaRPCOps.kt$import net.corda.client.rpc.*</ID>
<ID>WildcardImport:ReconnectingCordaRPCOps.kt$import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.*</ID>
<ID>WildcardImport:ReferenceInputStateTests.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReferencedStatesFlowTests.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReferencedStatesFlowTests.kt$import net.corda.core.flows.*</ID>

View File

@ -51,7 +51,7 @@ anything set earlier.
.. code-block:: none
custom = {
jvmArgs: [ '-Xmx1G', '-XX:+UseG1GC' ]
jvmArgs: [ "-Xmx1G", "-XX:+UseG1GC" ]
}
Note that this will completely replace any defaults set by capsule above, not just the flags that are set here, so if you use this
@ -85,7 +85,7 @@ anything set earlier.
.. code-block:: none
custom = {
jvmArgs: [ '-Xmx1G', '-XX:+UseG1GC', '-XX:-HeapDumpOnOutOfMemoryError' ]
jvmArgs: [ "-Xmx1G", "-XX:+UseG1GC", "-XX:-HeapDumpOnOutOfMemoryError" ]
}

View File

@ -17,16 +17,34 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.AttachmentTrustInfo
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.*
import net.corda.core.internal.sign
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.pendingFlowsCount
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeDiagnosticInfo
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
@ -120,14 +138,16 @@ internal class CordaRPCOpsImpl(
return services.vaultService._trackBy(criteria, paging, sorting, contractStateType)
}
@Suppress("OverridingDeprecatedMember")
@Suppress("OverridingDeprecatedMember", "DEPRECATION")
override fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction> {
val (snapshot, updates) = @Suppress("DEPRECATION") internalVerifiedTransactionsFeed()
val (snapshot, updates) = internalVerifiedTransactionsFeed()
updates.notUsed()
return snapshot
}
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? = services.validatedTransactions.getTransaction(txnId)
@Suppress("OverridingDeprecatedMember")
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? =
services.validatedTransactions.getTransaction(txnId)
@Suppress("OverridingDeprecatedMember")
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
@ -164,7 +184,8 @@ internal class CordaRPCOpsImpl(
return snapshot
}
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
override fun stateMachineRecordedTransactionMappingFeed():
DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return services.stateMachineRecordedTransactionMapping.track()
}
@ -292,7 +313,8 @@ internal class CordaRPCOpsImpl(
services.networkMapUpdater.updateNetworkMapCache()
} catch (e: Exception) {
when (e) {
is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down")
is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes " +
"are incorrect configuration or network map service being down")
else -> throw e
}
}
@ -302,15 +324,26 @@ internal class CordaRPCOpsImpl(
return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByCriteria(
criteria: QueryCriteria,
contractStateType: Class<out T>
): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByWithPagingSpec(
contractStateType: Class<out T>,
criteria: QueryCriteria,
paging: PageSpecification
): Vault.Page<T> {
return vaultQueryBy(criteria, paging, Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByWithSorting(
contractStateType: Class<out T>,
criteria: QueryCriteria,
sorting: Sort
): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), sorting, contractStateType)
}
@ -318,15 +351,26 @@ internal class CordaRPCOpsImpl(
return vaultTrackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByCriteria(
contractStateType: Class<out T>,
criteria: QueryCriteria
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByWithPagingSpec(
contractStateType: Class<out T>,
criteria: QueryCriteria,
paging: PageSpecification
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, paging, Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByWithSorting(
contractStateType: Class<out T>,
criteria: QueryCriteria,
sorting: Sort
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
}
@ -349,7 +393,10 @@ internal class CordaRPCOpsImpl(
.doOnCompleted(shutdownNode::invoke)
.subscribe(
{ }, // Nothing to do on each update here, only completion matters.
{ error -> logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }
{ error ->
logger.error("Error while waiting for pending flows to drain in preparation for shutdown. " +
"Cause was: ${error.message}", error)
}
)
drainingShutdownHook.set(subscription)
} else {
@ -375,7 +422,13 @@ internal class CordaRPCOpsImpl(
}
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
return StateMachineInfo(
flowLogic.runId,
flowLogic.javaClass.name,
flowLogic.stateMachine.context.toFlowInitiator(),
flowLogic.track(),
flowLogic.stateMachine.context
)
}
private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {

View File

@ -109,7 +109,10 @@ class HibernateInteractionTests {
object PersistenceSchema: MappedSchema(PersistenceSchema::class.java, 1, listOf(Parent::class.java, Child::class.java)) {
@Entity(name = "parents")
override val migrationResource: String?
get() = "hibernate-interactions-tests-schema"
@Entity(name = "parentstates")
@Table
class Parent: PersistentState() {
@ -122,7 +125,7 @@ class HibernateInteractionTests {
}
}
@Entity(name = "children")
@Entity(name = "childstates")
class Child(
@Id
// Do not change this: this generation type is required in order to trigger the proper cascade ordering.

View File

@ -0,0 +1,45 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<changeSet author="R3.Corda" id="hibernate-interaction-tests-changeset">
<createTable tableName="parentstates">
<column name="transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="output_index" type="INT">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="output_index, transaction_id" constraintName="parents_pkey" tableName="parentstates"/>
<createTable tableName="childstates">
<column name="identifier" type="INT" autoIncrement="true">
<constraints nullable="false"/>
</column>
<column name="member" type="NVARCHAR(255)">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="identifier" constraintName="children_pkey" tableName="childstates"/>
<createTable tableName="parentstates_childstates">
<column name="parentstates_output_index" type="INT">
<constraints nullable="false"/>
</column>
<column name="parentstates_transaction_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="children_identifier" type="INT">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="parentstates_output_index, parentstates_transaction_id, children_identifier" constraintName="parents_children_pkey" tableName="parentstates_childstates"/>
<addForeignKeyConstraint baseColumnNames="children_identifier" baseTableName="parentstates_childstates"
constraintName="FK_to_child"
referencedColumnNames="identifier" referencedTableName="childstates"/>
<addForeignKeyConstraint baseColumnNames="parentstates_output_index, parentstates_transaction_id" baseTableName="parentstates_childstates"
constraintName="FK_to_parent"
referencedColumnNames="output_index, transaction_id" referencedTableName="parentstates"/>
</changeSet>
</databaseChangeLog>

View File

@ -81,7 +81,8 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
"InvokeRpc.openAttachment",
"InvokeRpc.uploadAttachment",
"InvokeRpc.internalVerifiedTransactionsFeed",
"InvokeRpc.startTrackedFlowDynamic"]]]
"InvokeRpc.startTrackedFlowDynamic",
"InvokeRpc.nodeInfo"]]]
nodeDefaults {
projectCordapp {

View File

@ -57,7 +57,6 @@ repositories {
maven {
url 'https://dl.bintray.com/palantir/releases' // docker-compose-rule is published on bintray
}
maven { url 'https://jitpack.io' }
}
dependencies {

View File

@ -8,6 +8,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@ -24,6 +25,19 @@ class SharedMemoryIncremental extends PortAllocation {
private int startPort;
private int endPort;
private MappedByteBuffer mb;
private Long startingAddress;
private File file = new File(System.getProperty("user.home"), "corda-" + startPort + "-to-" + endPort + "-port-allocator.bin");
private RandomAccessFile backingFile;
{
try {
backingFile = new RandomAccessFile(file, "rw");
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
private SharedMemoryIncremental(int startPort, int endPort) {
this.startPort = startPort;
this.endPort = endPort;
@ -35,21 +49,9 @@ class SharedMemoryIncremental extends PortAllocation {
}
}
private File file = new File(System.getProperty("user.home"), "corda-" + startPort + "-to-" + endPort + "-port-allocator.bin");
private RandomAccessFile backingFile;
{
try {
backingFile = new RandomAccessFile(file, "rw");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
private MappedByteBuffer mb;
private Long startingAddress;
public static SharedMemoryIncremental INSTANCE = new SharedMemoryIncremental(DEFAULT_START_PORT, FIRST_EPHEMERAL_PORT);
static private Unsafe UNSAFE = getUnsafe();
static private Unsafe getUnsafe() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
@ -63,17 +65,31 @@ class SharedMemoryIncremental extends PortAllocation {
@Override
public int nextPort() {
Long oldValue;
Long newValue;
long oldValue;
long newValue;
boolean loopSuccess;
do {
oldValue = UNSAFE.getLongVolatile(null, startingAddress);
if (oldValue + 1 >= endPort || oldValue < startPort) {
newValue = Long.valueOf(startPort);
newValue = startPort;
} else {
newValue = (oldValue + 1);
}
} while (!UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue));
boolean reserveSuccess = UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue);
boolean portAvailable = isLocalPortAvailable(newValue);
loopSuccess = reserveSuccess && portAvailable;
} while (!loopSuccess);
return newValue.intValue();
return (int) newValue;
}
private boolean isLocalPortAvailable(Long portToTest) {
try (ServerSocket serverSocket = new ServerSocket(Math.toIntExact(portToTest))) {
} catch (Exception e) {
return false;
}
return true;
}
}

View File

@ -11,6 +11,7 @@ import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.notUsed
@ -19,11 +20,21 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.*
import net.corda.core.internal.Emoji
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.*
import net.corda.core.internal.packageName_
import net.corda.core.internal.rootCause
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.pendingFlowsCount
import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
import org.crsh.command.InvocationContext
@ -52,12 +63,17 @@ import java.io.FileDescriptor
import java.io.FileInputStream
import java.io.InputStream
import java.io.PrintWriter
import java.lang.reflect.*
import java.lang.reflect.GenericArrayType
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.lang.reflect.UndeclaredThrowableException
import java.nio.file.Path
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
// TODO: Add command history.
@ -76,11 +92,12 @@ object InteractiveShell {
private val log = LoggerFactory.getLogger(javaClass)
private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps
private lateinit var ops: InternalCordaRPCOps
private lateinit var rpcConn: AutoCloseable
private lateinit var rpcConn: CordaRPCConnection
private var shell: Shell? = null
private var classLoader: ClassLoader? = null
private lateinit var shellConfiguration: ShellConfiguration
private var onExit: () -> Unit = {}
private const val uuidStringSize = 36
@JvmStatic
fun getCordappsClassloader() = classLoader
@ -131,13 +148,41 @@ object InteractiveShell {
}
}
ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java)
ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand(
"output-format",
"Commands to inspect and update the output format.",
OutputFormatCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"run",
"Runs a method from the CordaRPCOps interface on the node.",
RunShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"flow",
"Commands to work with flows. Flows are how you can change the ledger.",
FlowShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"start",
"An alias for 'flow start'",
StartShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"hashLookup",
"Checks if a transaction with matching Id hash exists.",
HashLookupShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"attachments",
"Commands to extract information about attachments stored within the node",
AttachmentShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"checkpoints",
"Commands to extract information about checkpoints stored within the node",
CheckpointShellCommand::class.java
)
shell = ShellLifecycle(configuration.commandsDirectory).start(config, configuration.user, configuration.password, runSshDaemon)
}
@ -294,7 +339,12 @@ object InteractiveShell {
try {
// Show the progress tracker on the console until the flow completes or is interrupted with a
// Ctrl-C keypress.
val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper)
val stateObservable = runFlowFromString(
{ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) },
inputData,
flowClazz,
inputObjectMapper
)
latch = CountDownLatch(1)
ansiProgressRenderer.render(stateObservable, latch::countDown)
@ -327,7 +377,8 @@ object InteractiveShell {
}
class NoApplicableConstructor(val errors: List<String>) : CordaException(this.toString()) {
override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator())
override fun toString() =
(listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator())
}
/**
@ -375,7 +426,14 @@ object InteractiveShell {
log.error("Failed to parse flow ID", e)
return
}
//auxiliary validation - workaround for JDK8 bug https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8159339
if (id.length < uuidStringSize) {
val msg = "Can not kill the flow. Flow ID of '$id' seems to be malformed - a UUID should have $uuidStringSize characters. " +
"Expand the terminal window to see the full UUID value."
output.println(msg, Color.red)
log.warn(msg)
return
}
if (rpcOps.killFlow(runId)) {
output.println("Killed flow $runId", Color.yellow)
} else {
@ -386,7 +444,6 @@ object InteractiveShell {
}
}
// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.
/**
* Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts
* the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable
@ -515,7 +572,7 @@ object InteractiveShell {
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow)
return null
} else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) {
return InteractiveShell.gracefulShutdown(out, cordaRPCOps)
return gracefulShutdown(out, cordaRPCOps)
}
var result: Any? = null
@ -525,7 +582,7 @@ object InteractiveShell {
val call = parser.parse(cordaRPCOps, cmd)
result = call.call()
var subscription : Subscriber<*>? = null
if (result != null && result !== kotlin.Unit && result !is Void) {
if (result != null && result !== Unit && result !is Void) {
val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat)
subscription = subs
result = future
@ -568,60 +625,47 @@ object InteractiveShell {
userSessionOut.flush()
}
var isShuttingDown = false
try {
display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") }
isShuttingDown = true
display {
println("Orchestrating a clean shutdown, press CTRL+C to cancel...")
println("...enabling draining mode")
println("...waiting for in-flight flows to be completed")
}
cordaRPCOps.terminate(true)
val latch = CountDownLatch(1)
@Suppress("DEPRECATION")
cordaRPCOps.pendingFlowsCount().updates.doOnError { error ->
log.error(error.message)
throw error
}.doAfterTerminate(latch::countDown).subscribe(
// For each update.
{ (first, second) -> display { println("...remaining: $first / $second") } },
// On error.
{ error ->
if (!isShuttingDown) {
display { println("RPC failed: ${error.rootCause}", Color.red) }
}
},
// When completed.
{
rpcConn.close()
// This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
onExit.invoke()
})
while (!Thread.currentThread().isInterrupted) {
try {
latch.await()
break
} catch (e: InterruptedException) {
try {
cordaRPCOps.setFlowsDrainingModeEnabled(false)
display { println("...cancelled clean shutdown.") }
} finally {
Thread.currentThread().interrupt()
break
}
}
}
} catch (e: StringToMethodCallParser.UnparseableCallException) {
display {
println(e.message, Color.red)
println("Please try 'man run' to learn what syntax is acceptable")
val subscription = cordaRPCOps.pendingFlowsCount().updates
.doAfterTerminate(latch::countDown)
.subscribe(
// For each update.
{ (completed, total) -> display { println("...remaining: $completed / $total") } },
// On error.
{
log.error(it.message)
throw it
},
// When completed.
{
// This will only show up in the standalone Shell, because the embedded one
// is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
}
)
cordaRPCOps.terminate(true)
try {
latch.await()
// Unsubscribe or we hold up the shutdown
subscription.unsubscribe()
rpcConn.forceClose()
onExit.invoke()
} catch (e: InterruptedException) {
// Cancelled whilst draining flows. So let's carry on from here
cordaRPCOps.setFlowsDrainingModeEnabled(false)
display { println("...cancelled clean shutdown.") }
}
} catch (e: Exception) {
if (!isShuttingDown) {
display { println("RPC failed: ${e.rootCause}", Color.red) }
}
display { println("RPC failed: ${e.rootCause}", Color.red) }
} finally {
InputStreamSerializer.invokeContext = null
InputStreamDeserializer.closeAll()