PreAllocate pod resources during image build phase (#5587)

* use zulu for jdk
add some parallel groups

* port kubesTest to Java
remove asterix from tests listed by ListTests, instead add after allocation

* attempt to setup unit test builds with correct github integrations

# Conflicts:
#	.ci/dev/unit/Jenkinsfile

* fix issue with github context

* add credentials block

* start pre-allocating pods for builds

* test

* add blocks for reporting build stages

* add logic to preallocate pods during image building

* tidy up Jenkinsfile for unit tests

* add magic command line flag to enable preallocation of pods

* make docker tag deterministic

* fix issue concatenating docker tag inputs

* add build type specific Jenkinsfile

* try new preallocation approach

* make pre-allocation prefix group specific

* force deAllocator to wait for pods to be actually deleted

* revert jenkinsfiles in .ci

* use smarter waiting logic to address review comments

* add --stacktrace to builds to help debugging

* fix issue with closed stream

* add some logging around preallocation

* tidy up by refactoring (de)allocate task generation into method

* change default from 20 pods to 5 pods

* fix issue where docker tag was unstable between building and running tests

* more documentation

* add some infrastructure around setting the log level for a given build

* change preallocation pod duration to 5min

* see if fast enough if using combined unit and integration tests

* disable unit tests

* print out test summaries

* try and make the kubes client a per-use object, rather than a long lived object. This is step one of making GKE use possible

* add log line about what command is executed in the pod
This commit is contained in:
Stefano Franz 2019-10-28 11:48:04 +00:00 committed by GitHub
parent e2836b1106
commit f9890a5359
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 493 additions and 124 deletions

72
.ci/dev/smoke/Jenkinsfile vendored Normal file
View File

@ -0,0 +1,72 @@
@Library('existing-build-control')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
triggers {
issueCommentTrigger('.*smoke tests.*')
}
agent { label 'k8s' }
options { timestamps() }
environment {
DOCKER_TAG_TO_USE = "${env.GIT_COMMIT.subSequence(0, 8)}st"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
}
stages {
stage('Smoke Tests') {
steps {
script {
pullRequest.createStatus(status: 'pending',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Building',
targetUrl: "${env.JOB_URL}")
}
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-DpreAllocatePods=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelSmokeTest"
}
}
}
}
post {
always {
junit testResults: '**/build/test-results-xml/**/*.xml', allowEmptyResults: false
}
success {
script {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Passed',
targetUrl: "${env.JOB_URL}testResults")
}
}
failure {
script {
pullRequest.createStatus(status: 'failure',
context: 'continuous-integration/jenkins/pr-merge/smokeTest',
description: 'Smoke Tests Failed',
targetUrl: "${env.JOB_URL}testResults")
}
}
cleanup {
deleteDir() /* clean up our workspace */
}
}
}

18
Jenkinsfile vendored
View File

@ -22,8 +22,8 @@ pipeline {
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace"
}
sh "kubectl auth can-i get pods"
}
@ -36,10 +36,20 @@ pipeline {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelIntegrationTest"
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace"
}
}
// stage('Unit Tests') {
// steps {
// sh "./gradlew " +
// "-DbuildId=\"\${BUILD_ID}\" " +
// "-Dkubenetize=true " +
// "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" +
// " deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace"
// }
// }
}
}

View File

@ -1,7 +1,8 @@
import net.corda.testing.DistributedTesting
import net.corda.testing.DistributeTestsBy
import net.corda.testing.ImageBuilding
import net.corda.testing.Distribution
import net.corda.testing.ParallelTestGroup
import net.corda.testing.PodLogLevel
import static org.gradle.api.JavaVersion.VERSION_11
import static org.gradle.api.JavaVersion.VERSION_1_8
@ -28,8 +29,7 @@ buildscript {
ext.quasar_version = constants.getProperty("quasarVersion11")
ext.quasar_classifier = constants.getProperty("quasarClassifier11")
ext.jdkClassifier = constants.getProperty("jdkClassifier11")
}
else {
} else {
ext.quasar_version = constants.getProperty("quasarVersion")
ext.quasar_classifier = constants.getProperty("quasarClassifier")
ext.jdkClassifier = constants.getProperty("jdkClassifier")
@ -596,36 +596,22 @@ buildScan {
}
task allParallelIntegrationTest(type: ParallelTestGroup) {
podLogLevel PodLogLevel.INFO
testGroups "integrationTest"
numberOfShards 10
streamOutput false
coresPerFork 6
coresPerFork 5
memoryInGbPerFork 10
distribute Distribution.CLASS
}
task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest"
numberOfShards 4
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
}
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "smokeTest"
numberOfShards 4
streamOutput true
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.METHOD
distribute DistributeTestsBy.CLASS
}
task allParallelUnitTest(type: ParallelTestGroup) {
podLogLevel PodLogLevel.INFO
testGroups "test"
numberOfShards 10
streamOutput false
coresPerFork 5
memoryInGbPerFork 6
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
}
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest"
@ -633,7 +619,7 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
}
task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
@ -641,7 +627,15 @@ task parallelRegressionTest(type: ParallelTestGroup) {
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
distribute DistributeTestsBy.CLASS
}
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest", "smokeTest"
numberOfShards 4
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute DistributeTestsBy.CLASS
}
apply plugin: ImageBuilding
apply plugin: DistributedTesting

View File

@ -1,8 +1,11 @@
package net.corda.testing
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage
import org.gradle.api.GradleException
import org.gradle.api.Plugin
import org.gradle.api.Project
import org.gradle.api.Task
import org.gradle.api.tasks.testing.Test
/**
@ -18,19 +21,22 @@ class DistributedTesting implements Plugin<Project> {
void apply(Project project) {
if (System.getProperty("kubenetize") != null) {
def forks = getPropertyAsInt(project, "dockerForks", 1)
Integer forks = getPropertyAsInt(project, "dockerForks", 1)
ensureImagePluginIsApplied(project)
ImageBuilding imagePlugin = project.plugins.getPlugin(ImageBuilding)
DockerPushImage imageBuildingTask = imagePlugin.pushTask
String providedTag = System.getProperty("docker.tag")
DockerPushImage imagePushTask = imagePlugin.pushTask
DockerBuildImage imageBuildTask = imagePlugin.buildTask
String tagToUseForRunningTests = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
String tagToUseForBuilding = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
BucketingAllocatorTask globalAllocator = project.tasks.create("bucketingAllocator", BucketingAllocatorTask, forks)
def requestedTasks = project.gradle.startParameter.taskNames.collect { project.tasks.findByPath(it) }
Set<String> requestedTaskNames = project.gradle.startParameter.taskNames.toSet()
def requestedTasks = requestedTaskNames.collect { project.tasks.findByPath(it) }
//in each subproject
//1. add the task to determine all tests within the module
//2. modify the underlying testing task to use the output of the listing task to include a subset of tests for each fork
//1. add the task to determine all tests within the module and register this as a source to the global allocator
//2. modify the underlying testing task to use the output of the global allocator to include a subset of tests for each fork
//3. KubesTest will invoke these test tasks in a parallel fashion on a remote k8s cluster
//4. after each completed test write its name to a file to keep track of what finished for restart purposes
project.subprojects { Project subProject ->
@ -45,32 +51,49 @@ class DistributedTesting implements Plugin<Project> {
println "Skipping modification of ${task.getPath()} as it's not scheduled for execution"
}
if (!task.hasProperty("ignoreForDistribution")) {
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask, providedTag)
//this is what enables execution of a single test suite - for example node:parallelTest would execute all unit tests in node, node:parallelIntegrationTest would do the same for integration tests
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imagePushTask, tagToUseForRunningTests)
}
}
}
//now we are going to create "super" groupings of these KubesTest tasks, so that it is possible to invoke all submodule tests with a single command
//group all kubes tests by their underlying target task (test/integrationTest/smokeTest ... etc)
Map<String, List<KubesTest>> allKubesTestingTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() }
//now we are going to create "super" groupings of the Test tasks, so that it is possible to invoke all submodule tests with a single command
//group all test Tasks by their underlying target task (test/integrationTest/smokeTest ... etc)
Map<String, List<Test>> allTestTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() }
.flatten()
.findAll { task -> task instanceof KubesTest }
.groupBy { task -> task.taskToExecuteName }
.findAll { task -> task instanceof Test }
.groupBy { Test task -> task.name }
//first step is to create a single task which will invoke all the submodule tasks for each grouping
//ie allParallelTest will invoke [node:test, core:test, client:rpc:test ... etc]
//ie allIntegrationTest will invoke [node:integrationTest, core:integrationTest, client:rpc:integrationTest ... etc]
//ie allUnitAndIntegrationTest will invoke [node:integrationTest, node:test, core:integrationTest, core:test, client:rpc:test , client:rpc:integrationTest ... etc]
Set<ParallelTestGroup> userGroups = new HashSet<>(project.tasks.withType(ParallelTestGroup))
Collection<ParallelTestGroup> userDefinedGroups = userGroups.forEach { testGrouping ->
List<KubesTest> groups = ((ParallelTestGroup) testGrouping).groups.collect {
allKubesTestingTasksGroupedByType.get(it)
}.flatten()
String superListOfTasks = groups.collect { it.fullTaskToExecutePath }.join(" ")
userGroups.forEach { testGrouping ->
//for each "group" (ie: test, integrationTest) within the grouping find all the Test tasks which have the same name.
List<Test> groups = ((ParallelTestGroup) testGrouping).groups.collect { allTestTasksGroupedByType.get(it) }.flatten()
//join up these test tasks into a single set of tasks to invoke (node:test, node:integrationTest...)
String superListOfTasks = groups.collect { it.path }.join(" ")
//generate a preAllocate / deAllocate task which allows you to "pre-book" a node during the image building phase
//this prevents time lost to cloud provider node spin up time (assuming image build time > provider spin up time)
def (Task preAllocateTask, Task deAllocateTask) = generatePreAllocateAndDeAllocateTasksForGrouping(project, testGrouping)
//modify the image building task to depend on the preAllocate task (if specified on the command line) - this prevents gradle running out of order
if (preAllocateTask.name in requestedTaskNames) {
imageBuildTask.dependsOn preAllocateTask
}
def userDefinedParallelTask = project.rootProject.tasks.create("userDefined" + testGrouping.name.capitalize(), KubesTest) {
if (!providedTag) {
dependsOn imageBuildingTask
if (!tagToUseForRunningTests) {
dependsOn imagePushTask
}
if (deAllocateTask.name in requestedTaskNames) {
dependsOn deAllocateTask
}
numberOfPods = testGrouping.getShardCount()
printOutput = testGrouping.printToStdOut
@ -79,8 +102,9 @@ class DistributedTesting implements Plugin<Project> {
memoryGbPerFork = testGrouping.gbOfMemory
numberOfCoresPerFork = testGrouping.coresToUse
distribution = testGrouping.distribution
podLogLevel = testGrouping.logLevel
doFirst {
dockerTag = dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get())
dockerTag = tagToUseForRunningTests ? (ImageBuilding.registryName + ":" + tagToUseForRunningTests) : (imagePushTask.imageName.get() + ":" + imagePushTask.tag.get())
}
}
def reportOnAllTask = project.rootProject.tasks.create("userDefinedReports${testGrouping.name.capitalize()}", KubesReporting) {
@ -99,6 +123,38 @@ class DistributedTesting implements Plugin<Project> {
}
}
private List<Task> generatePreAllocateAndDeAllocateTasksForGrouping(Project project, ParallelTestGroup testGrouping) {
PodAllocator allocator = new PodAllocator(project.getLogger())
Task preAllocateTask = project.rootProject.tasks.create("preAllocateFor" + testGrouping.name.capitalize()) {
doFirst {
String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY)
if (dockerTag == null) {
throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY)
}
int seed = (dockerTag.hashCode() + testGrouping.name.hashCode())
String podPrefix = new BigInteger(64, new Random(seed)).toString(36)
//here we will pre-request the correct number of pods for this testGroup
int numberOfPodsToRequest = testGrouping.getShardCount()
int coresPerPod = testGrouping.getCoresToUse()
int memoryGBPerPod = testGrouping.getGbOfMemory()
allocator.allocatePods(numberOfPodsToRequest, coresPerPod, memoryGBPerPod, podPrefix)
}
}
Task deAllocateTask = project.rootProject.tasks.create("deAllocateFor" + testGrouping.name.capitalize()) {
doFirst {
String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
if (dockerTag == null) {
throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY)
}
int seed = (dockerTag.hashCode() + testGrouping.name.hashCode())
String podPrefix = new BigInteger(64, new Random(seed)).toString(36);
allocator.tearDownPods(podPrefix)
}
}
return [preAllocateTask, deAllocateTask]
}
private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask, String providedTag) {
def taskName = task.getName()
def capitalizedTaskName = task.getName().capitalize()

View File

@ -1,8 +1,17 @@
package net.corda.testing;
import com.bmuschko.gradle.docker.DockerRegistryCredentials;
import com.bmuschko.gradle.docker.tasks.container.*;
import com.bmuschko.gradle.docker.tasks.image.*;
import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerLogsContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer;
import com.bmuschko.gradle.docker.tasks.container.DockerWaitContainer;
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage;
import com.bmuschko.gradle.docker.tasks.image.DockerCommitImage;
import com.bmuschko.gradle.docker.tasks.image.DockerPullImage;
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage;
import com.bmuschko.gradle.docker.tasks.image.DockerRemoveImage;
import com.bmuschko.gradle.docker.tasks.image.DockerTagImage;
import org.gradle.api.GradleException;
import org.gradle.api.Plugin;
import org.gradle.api.Project;
@ -12,6 +21,7 @@ import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* this plugin is responsible for setting up all the required docker image building tasks required for producing and pushing an
@ -20,7 +30,10 @@ import java.util.Map;
public class ImageBuilding implements Plugin<Project> {
public static final String registryName = "stefanotestingcr.azurecr.io/testing";
public static final String PROVIDE_TAG_FOR_BUILDING_PROPERTY = "docker.build.tag";
public static final String PROVIDE_TAG_FOR_RUNNING_PROPERTY = "docker.run.tag";
public DockerPushImage pushTask;
public DockerBuildImage buildTask;
@Override
public void apply(@NotNull final Project project) {
@ -43,6 +56,8 @@ public class ImageBuilding implements Plugin<Project> {
dockerBuildImage.getDockerFile().set(new File(new File("testing"), "Dockerfile"));
});
this.buildTask = buildDockerImageForSource;
final DockerCreateContainer createBuildContainer = project.getTasks().create("createBuildContainer", DockerCreateContainer.class,
dockerCreateContainer -> {
final File baseWorkingDir = new File(System.getProperty("docker.work.dir") != null &&
@ -58,7 +73,7 @@ public class ImageBuilding implements Plugin<Project> {
mavenDir.mkdirs();
}
project.getLogger().info("Will use: ${gradleDir.absolutePath} for caching gradle artifacts");
project.getLogger().info("Will use: " + gradleDir.getAbsolutePath() + " for caching gradle artifacts");
});
dockerCreateContainer.dependsOn(buildDockerImageForSource);
dockerCreateContainer.targetImageId(buildDockerImageForSource.getImageId());
@ -103,8 +118,7 @@ public class ImageBuilding implements Plugin<Project> {
final DockerTagImage tagBuildImageResult = project.getTasks().create("tagBuildImageResult", DockerTagImage.class, dockerTagImage -> {
dockerTagImage.dependsOn(commitBuildImageResult);
dockerTagImage.getImageId().set(commitBuildImageResult.getImageId());
dockerTagImage.getTag().set(System.getProperty("docker.provided.tag",
"${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"));
dockerTagImage.getTag().set(System.getProperty(PROVIDE_TAG_FOR_BUILDING_PROPERTY, UUID.randomUUID().toString().toLowerCase().substring(0, 12)));
dockerTagImage.getRepository().set(registryName);
});

View File

@ -1,7 +1,18 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.api.model.DoneablePod;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusCause;
import io.fabric8.kubernetes.api.model.StatusDetails;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.utils.Serialization;
@ -11,12 +22,34 @@ import org.gradle.api.DefaultTask;
import org.gradle.api.tasks.TaskAction;
import org.jetbrains.annotations.NotNull;
import java.io.*;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -33,18 +66,20 @@ public class KubesTest extends DefaultTask {
String fullTaskToExecutePath;
String taskToExecuteName;
Boolean printOutput = false;
Integer numberOfCoresPerFork = 4;
Integer memoryGbPerFork = 6;
public volatile List<File> testOutput = Collections.emptyList();
public volatile List<KubePodResult> containerResults = Collections.emptyList();
String namespace = "thisisatest";
public static String NAMESPACE = "thisisatest";
int k8sTimeout = 50 * 1_000;
int webSocketTimeout = k8sTimeout * 6;
int numberOfPods = 20;
int numberOfPods = 5;
int timeoutInMinutesForPodToStart = 60;
Distribution distribution = Distribution.METHOD;
DistributeTestsBy distribution = DistributeTestsBy.METHOD;
PodLogLevel podLogLevel = PodLogLevel.INFO;
@TaskAction
public void runDistributedTests() {
@ -54,10 +89,8 @@ public class KubesTest extends DefaultTask {
String stableRunId = rnd64Base36(new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode()));
String random = rnd64Base36(new Random());
final KubernetesClient client = getKubernetesClient();
try {
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
try (KubernetesClient client = getKubernetesClient()) {
client.pods().inNamespace(NAMESPACE).list().getItems().forEach(podToDelete -> {
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName());
client.resource(podToDelete).delete();
@ -68,8 +101,9 @@ public class KubesTest extends DefaultTask {
}
List<Future<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
String podName = taskToExecuteName.toLowerCase() + "-" + stableRunId + "-" + random + "-" + i;
return submitBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
String potentialPodName = (taskToExecuteName + "-" + stableRunId + random + i).toLowerCase();
String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62));
return submitBuild(NAMESPACE, numberOfPods, i, podName, printOutput, 3);
}).collect(Collectors.toList());
this.testOutput = Collections.synchronizedList(futures.stream().map(it -> {
@ -107,8 +141,7 @@ public class KubesTest extends DefaultTask {
.toLowerCase();
}
private Future<KubePodResult> submitBuild(
KubernetesClient client,
private CompletableFuture<KubePodResult> submitBuild(
String namespace,
int numberOfPods,
int podIdx,
@ -116,7 +149,9 @@ public class KubesTest extends DefaultTask {
boolean printOutput,
int numberOfRetries
) {
return executorService.submit(() -> buildRunPodWithRetriesOrThrow(client, namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries));
return CompletableFuture.supplyAsync(() -> {
return buildRunPodWithRetriesOrThrow(namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries);
}, executorService);
}
private static void addShutdownHook(Runnable hook) {
@ -125,27 +160,23 @@ public class KubesTest extends DefaultTask {
private PersistentVolumeClaim createPvc(KubernetesClient client, String name) {
PersistentVolumeClaim pvc = client.persistentVolumeClaims()
.inNamespace(namespace)
.inNamespace(NAMESPACE)
.createNew()
.editOrNewMetadata().withName(name).endMetadata()
.editOrNewSpec()
.withAccessModes("ReadWriteOnce")
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
.endSpec()
.done();
addShutdownHook(() -> {
System.out.println("Deleing PVC: " + pvc.getMetadata().getName());
System.out.println("Deleting PVC: " + pvc.getMetadata().getName());
client.persistentVolumeClaims().delete(pvc);
});
return pvc;
}
private KubePodResult buildRunPodWithRetriesOrThrow(
KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
@ -155,48 +186,48 @@ public class KubesTest extends DefaultTask {
) {
addShutdownHook(() -> {
System.out.println("deleting pod: " + podName);
client.pods().inNamespace(namespace).withName(podName).delete();
try (KubernetesClient client = getKubernetesClient()) {
client.pods().inNamespace(namespace).withName(podName).delete();
}
});
try {
// pods might die, so we retry
return Retry.fixed(numberOfRetries).run(() -> {
// remove pod if exists
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
if (oldPod.get() != null) {
getLogger().lifecycle("deleting pod: {}", podName);
oldPod.delete();
while (oldPod.get() != null) {
getLogger().info("waiting for pod {} to be removed", podName);
Thread.sleep(1000);
Pod createdPod;
try (KubernetesClient client = getKubernetesClient()) {
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
if (oldPod.get() != null) {
getLogger().lifecycle("deleting pod: {}", podName);
oldPod.delete();
while (oldPod.get() != null) {
getLogger().info("waiting for pod {} to be removed", podName);
Thread.sleep(1000);
}
}
getProject().getLogger().lifecycle("creating pod: " + podName);
createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName));
getProject().getLogger().lifecycle("scheduled pod: " + podName);
}
// recreate and run
getProject().getLogger().lifecycle("creating pod: " + podName);
Pod createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName));
getProject().getLogger().lifecycle("scheduled pod: " + podName);
attachStatusListenerToPod(client, createdPod);
waitForPodToStart(client, createdPod);
attachStatusListenerToPod(createdPod);
waitForPodToStart(createdPod);
PipedOutputStream stdOutOs = new PipedOutputStream();
PipedInputStream stdOutIs = new PipedInputStream(4096);
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
CompletableFuture<Integer> waiter = new CompletableFuture<>();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter);
stdOutIs.connect(stdOutOs);
client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(execListener)
.exec(getBuildCommand(numberOfPods, podIdx));
File podOutput = executeBuild(namespace, numberOfPods, podIdx, podName, printOutput, stdOutOs, stdOutIs, errChannelStream, waiter);
File podOutput = startLogPumping(stdOutIs, podIdx, printOutput);
int resCode = waiter.join();
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + "), gathering results");
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, createdPod);
Collection<File> binaryResults = downloadTestXmlFromPod(namespace, createdPod);
getLogger().lifecycle("removing pod " + podName + " (" + podIdx + "/" + numberOfPods + ") after completed build");
try (KubernetesClient client = getKubernetesClient()) {
client.pods().delete(createdPod);
}
return new KubePodResult(resCode, podOutput, binaryResults);
});
} catch (Retry.RetryException e) {
@ -204,6 +235,31 @@ public class KubesTest extends DefaultTask {
}
}
@NotNull
private File executeBuild(String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
PipedOutputStream stdOutOs,
PipedInputStream stdOutIs,
ByteArrayOutputStream errChannelStream,
CompletableFuture<Integer> waiter) throws IOException {
KubernetesClient client = getKubernetesClient();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter);
stdOutIs.connect(stdOutOs);
String[] buildCommand = getBuildCommand(numberOfPods, podIdx);
getProject().getLogger().quiet("About to execute " + Arrays.stream(buildCommand).reduce("", (s, s2) -> s + " " + s2) + " on pod " + podName);
client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(execListener)
.exec(getBuildCommand(numberOfPods, podIdx));
return startLogPumping(stdOutIs, podIdx, printOutput);
}
private Pod buildPodRequest(String podName) {
return new PodBuilder()
.withNewMetadata().withName(podName).endMetadata()
@ -283,7 +339,8 @@ public class KubesTest extends DefaultTask {
return outputFile;
}
private Watch attachStatusListenerToPod(KubernetesClient client, Pod pod) {
private Watch attachStatusListenerToPod(Pod pod) {
KubernetesClient client = getKubernetesClient();
return client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
@Override
public void eventReceived(Watcher.Action action, Pod resource) {
@ -292,21 +349,24 @@ public class KubesTest extends DefaultTask {
@Override
public void onClose(KubernetesClientException cause) {
client.close();
}
});
}
private void waitForPodToStart(KubernetesClient client, Pod pod) {
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
try {
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
private void waitForPodToStart(Pod pod) {
try (KubernetesClient client = getKubernetesClient()) {
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
try {
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
}
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
}
private Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
private Collection<File> downloadTestXmlFromPod(String namespace, Pod cp) {
String resultsInContainerPath = "/tmp/source/build/test-reports";
String binaryResultsFile = "results.bin";
String podName = cp.getMetadata().getName();
@ -315,25 +375,42 @@ public class KubesTest extends DefaultTask {
if (!tempDir.toFile().exists()) {
tempDir.toFile().mkdirs();
}
getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath());
client.pods()
.inNamespace(namespace)
.withName(podName)
.dir(resultsInContainerPath)
.copy(tempDir);
try (KubernetesClient client = getKubernetesClient()) {
client.pods()
.inNamespace(namespace)
.withName(podName)
.dir(resultsInContainerPath)
.copy(tempDir);
}
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile);
}
private String[] getBuildCommand(int numberOfPods, int podIdx) {
String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " +
"let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" +
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" +
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " " + getLoggingLevel() + " 2>&1 ;" +
"let rs=$? ; sleep 10 ; exit ${rs}";
return new String[]{"bash", "-c", shellScript};
}
private String getLoggingLevel() {
switch (podLogLevel) {
case INFO:
return " --info";
case WARN:
return " --warn";
case QUIET:
return " --quiet";
case DEBUG:
return " --debug";
default:
throw new IllegalArgumentException("LogLevel: " + podLogLevel + " is unknown");
}
}
private List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start));
List<File> folders = new ArrayList<>();
@ -362,7 +439,7 @@ public class KubesTest extends DefaultTask {
@Override
public void onFailure(Throwable t, Response response) {
getProject().getLogger().lifecycle("Received error from rom pod " + podName);
getProject().getLogger().lifecycle("Received error from pod " + podName);
waitingFuture.completeExceptionally(t);
}

View File

@ -47,7 +47,7 @@ class ListTests extends DefaultTask implements TestLister {
FileCollection scanClassPath
List<String> allTests
Distribution distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? Distribution.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : Distribution.METHOD
DistributeTestsBy distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? DistributeTestsBy.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : DistributeTestsBy.METHOD
def getTestsForFork(int fork, int forks, Integer seed) {
def gitSha = new BigInteger(project.hasProperty("corda_revision") ? project.property("corda_revision").toString() : "0", 36)
@ -66,7 +66,7 @@ class ListTests extends DefaultTask implements TestLister {
@TaskAction
def discoverTests() {
switch (distribution) {
case Distribution.METHOD:
case DistributeTestsBy.METHOD:
Collection<String> results = new ClassGraph()
.enableClassInfo()
.enableMethodInfo()
@ -85,7 +85,7 @@ class ListTests extends DefaultTask implements TestLister {
this.allTests = results.stream().sorted().collect(Collectors.toList())
break
case Distribution.CLASS:
case DistributeTestsBy.CLASS:
Collection<String> results = new ClassGraph()
.enableClassInfo()
.enableMethodInfo()
@ -105,6 +105,6 @@ class ListTests extends DefaultTask implements TestLister {
}
}
public enum Distribution {
public enum DistributeTestsBy {
CLASS, METHOD
}

View File

@ -3,19 +3,25 @@ package net.corda.testing
import org.gradle.api.DefaultTask
class ParallelTestGroup extends DefaultTask {
Distribution distribution = Distribution.METHOD
DistributeTestsBy distribution = DistributeTestsBy.METHOD
List<String> groups = new ArrayList<>()
int shardCount = 20
int coresToUse = 4
int gbOfMemory = 4
boolean printToStdOut = true
PodLogLevel logLevel = PodLogLevel.INFO
void numberOfShards(int shards) {
this.shardCount = shards
}
void distribute(Distribution dist){
void podLogLevel(PodLogLevel level) {
this.logLevel = level
}
void distribute(DistributeTestsBy dist) {
this.distribution = dist
}

View File

@ -0,0 +1,135 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class PodAllocator {
private static final int CONNECTION_TIMEOUT = 60_1000;
private final Logger logger;
public PodAllocator(Logger logger) {
this.logger = logger;
}
public PodAllocator() {
this.logger = LoggerFactory.getLogger(PodAllocator.class);
}
public void allocatePods(Integer number, Integer coresPerPod, Integer memoryPerPod, String prefix) {
Config config = new ConfigBuilder()
.withConnectionTimeout(CONNECTION_TIMEOUT)
.withRequestTimeout(CONNECTION_TIMEOUT)
.withRollingTimeout(CONNECTION_TIMEOUT)
.withWebsocketTimeout(CONNECTION_TIMEOUT)
.withWebsocketPingInterval(CONNECTION_TIMEOUT)
.build();
KubernetesClient client = new DefaultKubernetesClient(config);
List<Pod> podsToRequest = IntStream.range(0, number).mapToObj(i -> buildPod("pa-" + prefix + i, coresPerPod, memoryPerPod)).collect(Collectors.toList());
podsToRequest.forEach(requestedPod -> {
String msg = "PreAllocating " + requestedPod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).quiet(msg);
} else {
logger.info(msg);
}
client.pods().inNamespace(KubesTest.NAMESPACE).create(requestedPod);
});
}
public void tearDownPods(String prefix) {
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
.withConnectionTimeout(CONNECTION_TIMEOUT)
.withRequestTimeout(CONNECTION_TIMEOUT)
.withRollingTimeout(CONNECTION_TIMEOUT)
.withWebsocketTimeout(CONNECTION_TIMEOUT)
.withWebsocketPingInterval(CONNECTION_TIMEOUT)
.build();
KubernetesClient client = new DefaultKubernetesClient(config);
Stream<Pod> podsToDelete = client.pods().inNamespace(KubesTest.NAMESPACE).list()
.getItems()
.stream()
.sorted(Comparator.comparing(p -> p.getMetadata().getName()))
.filter(foundPod -> foundPod.getMetadata().getName().contains(prefix));
List<CompletableFuture<Pod>> deleteFutures = podsToDelete.map(pod -> {
CompletableFuture<Pod> result = new CompletableFuture<>();
Watch watch = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
if (action == Action.DELETED) {
result.complete(resource);
String msg = "Successfully deleted pod " + pod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).lifecycle(msg);
} else {
logger.info(msg);
}
}
}
@Override
public void onClose(KubernetesClientException cause) {
String message = "Failed to delete pod " + pod.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
((org.gradle.api.logging.Logger) logger).quiet(message);
} else {
logger.info(message);
}
result.completeExceptionally(cause);
}
});
client.pods().delete(pod);
return result;
}).collect(Collectors.toList());
try {
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//ignore - there's nothing left to do
}
}
Pod buildPod(String podName, Integer coresPerPod, Integer memoryPerPod) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewContainer()
.withImage("busybox:latest")
.withCommand("sh")
.withArgs("-c", "sleep 300")
.withName(podName)
.withNewResources()
.addToRequests("cpu", new Quantity(coresPerPod.toString()))
.addToRequests("memory", new Quantity(memoryPerPod.toString() + "Gi"))
.endResources()
.endContainer()
.withRestartPolicy("Never")
.endSpec()
.build();
}
}

View File

@ -0,0 +1,5 @@
package net.corda.testing;
public enum PodLogLevel {
QUIET, WARN, INFO, DEBUG
}