From f9890a53590fe4e6743e6717f232f65b5927cbd7 Mon Sep 17 00:00:00 2001 From: Stefano Franz Date: Mon, 28 Oct 2019 11:48:04 +0000 Subject: [PATCH] PreAllocate pod resources during image build phase (#5587) * use zulu for jdk add some parallel groups * port kubesTest to Java remove asterix from tests listed by ListTests, instead add after allocation * attempt to setup unit test builds with correct github integrations # Conflicts: # .ci/dev/unit/Jenkinsfile * fix issue with github context * add credentials block * start pre-allocating pods for builds * test * add blocks for reporting build stages * add logic to preallocate pods during image building * tidy up Jenkinsfile for unit tests * add magic command line flag to enable preallocation of pods * make docker tag deterministic * fix issue concatenating docker tag inputs * add build type specific Jenkinsfile * try new preallocation approach * make pre-allocation prefix group specific * force deAllocator to wait for pods to be actually deleted * revert jenkinsfiles in .ci * use smarter waiting logic to address review comments * add --stacktrace to builds to help debugging * fix issue with closed stream * add some logging around preallocation * tidy up by refactoring (de)allocate task generation into method * change default from 20 pods to 5 pods * fix issue where docker tag was unstable between building and running tests * more documentation * add some infrastructure around setting the log level for a given build * change preallocation pod duration to 5min * see if fast enough if using combined unit and integration tests * disable unit tests * print out test summaries * try and make the kubes client a per-use object, rather than a long lived object. This is step one of making GKE use possible * add log line about what command is executed in the pod --- .ci/dev/smoke/Jenkinsfile | 72 ++++++ Jenkinsfile | 18 +- build.gradle | 42 ++-- .../corda/testing/DistributedTesting.groovy | 96 ++++++-- .../net/corda/testing/ImageBuilding.java | 24 +- .../groovy/net/corda/testing/KubesTest.java | 207 ++++++++++++------ .../groovy/net/corda/testing/ListTests.groovy | 8 +- .../corda/testing/ParallelTestGroup.groovy | 10 +- .../net/corda/testing/PodAllocator.java | 135 ++++++++++++ .../groovy/net/corda/testing/PodLogLevel.java | 5 + 10 files changed, 493 insertions(+), 124 deletions(-) create mode 100644 .ci/dev/smoke/Jenkinsfile create mode 100644 buildSrc/src/main/groovy/net/corda/testing/PodAllocator.java create mode 100644 buildSrc/src/main/groovy/net/corda/testing/PodLogLevel.java diff --git a/.ci/dev/smoke/Jenkinsfile b/.ci/dev/smoke/Jenkinsfile new file mode 100644 index 0000000000..603bc520dc --- /dev/null +++ b/.ci/dev/smoke/Jenkinsfile @@ -0,0 +1,72 @@ +@Library('existing-build-control') +import static com.r3.build.BuildControl.killAllExistingBuildsForJob + +killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger()) + +pipeline { + triggers { + issueCommentTrigger('.*smoke tests.*') + } + + agent { label 'k8s' } + options { timestamps() } + + environment { + DOCKER_TAG_TO_USE = "${env.GIT_COMMIT.subSequence(0, 8)}st" + EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}" + BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}" + } + + stages { + stage('Smoke Tests') { + steps { + script { + pullRequest.createStatus(status: 'pending', + context: 'continuous-integration/jenkins/pr-merge/smokeTest', + description: 'Smoke Tests Building', + targetUrl: "${env.JOB_URL}") + } + + withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) { + sh "./gradlew " + + "-DbuildId=\"\${BUILD_ID}\" " + + "-Dkubenetize=true " + + "-DpreAllocatePods=true " + + "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + + "-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " + + "-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" + + " allParallelSmokeTest" + } + } + } + } + + post { + + always { + junit testResults: '**/build/test-results-xml/**/*.xml', allowEmptyResults: false + } + + success { + script { + pullRequest.createStatus(status: 'success', + context: 'continuous-integration/jenkins/pr-merge/smokeTest', + description: 'Smoke Tests Passed', + targetUrl: "${env.JOB_URL}testResults") + } + } + + failure { + script { + pullRequest.createStatus(status: 'failure', + context: 'continuous-integration/jenkins/pr-merge/smokeTest', + description: 'Smoke Tests Failed', + targetUrl: "${env.JOB_URL}testResults") + } + } + + cleanup { + deleteDir() /* clean up our workspace */ + } + } +} \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile index 13dc04c6a4..ab2862b88d 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -22,8 +22,8 @@ pipeline { "-Dkubenetize=true " + "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + "-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " + - "-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " clean pushBuildImage" + "-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" + + " clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace" } sh "kubectl auth can-i get pods" } @@ -36,10 +36,20 @@ pipeline { sh "./gradlew " + "-DbuildId=\"\${BUILD_ID}\" " + "-Dkubenetize=true " + - "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " allParallelIntegrationTest" + "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" + + " deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace" } } +// stage('Unit Tests') { +// steps { +// sh "./gradlew " + +// "-DbuildId=\"\${BUILD_ID}\" " + +// "-Dkubenetize=true " + +// "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" + +// " deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace" +// } +// } + } } diff --git a/build.gradle b/build.gradle index 378494e1bd..2732afe515 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,8 @@ import net.corda.testing.DistributedTesting +import net.corda.testing.DistributeTestsBy import net.corda.testing.ImageBuilding -import net.corda.testing.Distribution import net.corda.testing.ParallelTestGroup +import net.corda.testing.PodLogLevel import static org.gradle.api.JavaVersion.VERSION_11 import static org.gradle.api.JavaVersion.VERSION_1_8 @@ -28,8 +29,7 @@ buildscript { ext.quasar_version = constants.getProperty("quasarVersion11") ext.quasar_classifier = constants.getProperty("quasarClassifier11") ext.jdkClassifier = constants.getProperty("jdkClassifier11") - } - else { + } else { ext.quasar_version = constants.getProperty("quasarVersion") ext.quasar_classifier = constants.getProperty("quasarClassifier") ext.jdkClassifier = constants.getProperty("jdkClassifier") @@ -596,36 +596,22 @@ buildScan { } task allParallelIntegrationTest(type: ParallelTestGroup) { + podLogLevel PodLogLevel.INFO testGroups "integrationTest" numberOfShards 10 streamOutput false - coresPerFork 6 + coresPerFork 5 memoryInGbPerFork 10 - distribute Distribution.CLASS -} -task allParallelSlowIntegrationTest(type: ParallelTestGroup) { - testGroups "slowIntegrationTest" - numberOfShards 4 - streamOutput false - coresPerFork 6 - memoryInGbPerFork 10 - distribute Distribution.CLASS -} -task allParallelSmokeTest(type: ParallelTestGroup) { - testGroups "smokeTest" - numberOfShards 4 - streamOutput true - coresPerFork 6 - memoryInGbPerFork 10 - distribute Distribution.METHOD + distribute DistributeTestsBy.CLASS } task allParallelUnitTest(type: ParallelTestGroup) { + podLogLevel PodLogLevel.INFO testGroups "test" numberOfShards 10 streamOutput false coresPerFork 5 memoryInGbPerFork 6 - distribute Distribution.CLASS + distribute DistributeTestsBy.CLASS } task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) { testGroups "test", "integrationTest" @@ -633,7 +619,7 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) { streamOutput false coresPerFork 6 memoryInGbPerFork 10 - distribute Distribution.CLASS + distribute DistributeTestsBy.CLASS } task parallelRegressionTest(type: ParallelTestGroup) { testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest" @@ -641,7 +627,15 @@ task parallelRegressionTest(type: ParallelTestGroup) { streamOutput false coresPerFork 6 memoryInGbPerFork 10 - distribute Distribution.CLASS + distribute DistributeTestsBy.CLASS +} +task allParallelSmokeTest(type: ParallelTestGroup) { + testGroups "slowIntegrationTest", "smokeTest" + numberOfShards 4 + streamOutput false + coresPerFork 6 + memoryInGbPerFork 10 + distribute DistributeTestsBy.CLASS } apply plugin: ImageBuilding apply plugin: DistributedTesting diff --git a/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy b/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy index 4bff02cf2d..c8eab20f08 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy @@ -1,8 +1,11 @@ package net.corda.testing +import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage import com.bmuschko.gradle.docker.tasks.image.DockerPushImage +import org.gradle.api.GradleException import org.gradle.api.Plugin import org.gradle.api.Project +import org.gradle.api.Task import org.gradle.api.tasks.testing.Test /** @@ -18,19 +21,22 @@ class DistributedTesting implements Plugin { void apply(Project project) { if (System.getProperty("kubenetize") != null) { - def forks = getPropertyAsInt(project, "dockerForks", 1) + Integer forks = getPropertyAsInt(project, "dockerForks", 1) ensureImagePluginIsApplied(project) ImageBuilding imagePlugin = project.plugins.getPlugin(ImageBuilding) - DockerPushImage imageBuildingTask = imagePlugin.pushTask - String providedTag = System.getProperty("docker.tag") + DockerPushImage imagePushTask = imagePlugin.pushTask + DockerBuildImage imageBuildTask = imagePlugin.buildTask + String tagToUseForRunningTests = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY) + String tagToUseForBuilding = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY) BucketingAllocatorTask globalAllocator = project.tasks.create("bucketingAllocator", BucketingAllocatorTask, forks) - def requestedTasks = project.gradle.startParameter.taskNames.collect { project.tasks.findByPath(it) } + Set requestedTaskNames = project.gradle.startParameter.taskNames.toSet() + def requestedTasks = requestedTaskNames.collect { project.tasks.findByPath(it) } //in each subproject - //1. add the task to determine all tests within the module - //2. modify the underlying testing task to use the output of the listing task to include a subset of tests for each fork + //1. add the task to determine all tests within the module and register this as a source to the global allocator + //2. modify the underlying testing task to use the output of the global allocator to include a subset of tests for each fork //3. KubesTest will invoke these test tasks in a parallel fashion on a remote k8s cluster //4. after each completed test write its name to a file to keep track of what finished for restart purposes project.subprojects { Project subProject -> @@ -45,32 +51,49 @@ class DistributedTesting implements Plugin { println "Skipping modification of ${task.getPath()} as it's not scheduled for execution" } if (!task.hasProperty("ignoreForDistribution")) { - KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask, providedTag) + //this is what enables execution of a single test suite - for example node:parallelTest would execute all unit tests in node, node:parallelIntegrationTest would do the same for integration tests + KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imagePushTask, tagToUseForRunningTests) } } } - //now we are going to create "super" groupings of these KubesTest tasks, so that it is possible to invoke all submodule tests with a single command - //group all kubes tests by their underlying target task (test/integrationTest/smokeTest ... etc) - Map> allKubesTestingTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() } + //now we are going to create "super" groupings of the Test tasks, so that it is possible to invoke all submodule tests with a single command + //group all test Tasks by their underlying target task (test/integrationTest/smokeTest ... etc) + Map> allTestTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() } .flatten() - .findAll { task -> task instanceof KubesTest } - .groupBy { task -> task.taskToExecuteName } + .findAll { task -> task instanceof Test } + .groupBy { Test task -> task.name } //first step is to create a single task which will invoke all the submodule tasks for each grouping //ie allParallelTest will invoke [node:test, core:test, client:rpc:test ... etc] //ie allIntegrationTest will invoke [node:integrationTest, core:integrationTest, client:rpc:integrationTest ... etc] + //ie allUnitAndIntegrationTest will invoke [node:integrationTest, node:test, core:integrationTest, core:test, client:rpc:test , client:rpc:integrationTest ... etc] Set userGroups = new HashSet<>(project.tasks.withType(ParallelTestGroup)) - Collection userDefinedGroups = userGroups.forEach { testGrouping -> - List groups = ((ParallelTestGroup) testGrouping).groups.collect { - allKubesTestingTasksGroupedByType.get(it) - }.flatten() - String superListOfTasks = groups.collect { it.fullTaskToExecutePath }.join(" ") + userGroups.forEach { testGrouping -> + + //for each "group" (ie: test, integrationTest) within the grouping find all the Test tasks which have the same name. + List groups = ((ParallelTestGroup) testGrouping).groups.collect { allTestTasksGroupedByType.get(it) }.flatten() + + //join up these test tasks into a single set of tasks to invoke (node:test, node:integrationTest...) + String superListOfTasks = groups.collect { it.path }.join(" ") + + //generate a preAllocate / deAllocate task which allows you to "pre-book" a node during the image building phase + //this prevents time lost to cloud provider node spin up time (assuming image build time > provider spin up time) + def (Task preAllocateTask, Task deAllocateTask) = generatePreAllocateAndDeAllocateTasksForGrouping(project, testGrouping) + + //modify the image building task to depend on the preAllocate task (if specified on the command line) - this prevents gradle running out of order + if (preAllocateTask.name in requestedTaskNames) { + imageBuildTask.dependsOn preAllocateTask + } def userDefinedParallelTask = project.rootProject.tasks.create("userDefined" + testGrouping.name.capitalize(), KubesTest) { - if (!providedTag) { - dependsOn imageBuildingTask + if (!tagToUseForRunningTests) { + dependsOn imagePushTask + } + + if (deAllocateTask.name in requestedTaskNames) { + dependsOn deAllocateTask } numberOfPods = testGrouping.getShardCount() printOutput = testGrouping.printToStdOut @@ -79,8 +102,9 @@ class DistributedTesting implements Plugin { memoryGbPerFork = testGrouping.gbOfMemory numberOfCoresPerFork = testGrouping.coresToUse distribution = testGrouping.distribution + podLogLevel = testGrouping.logLevel doFirst { - dockerTag = dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get()) + dockerTag = tagToUseForRunningTests ? (ImageBuilding.registryName + ":" + tagToUseForRunningTests) : (imagePushTask.imageName.get() + ":" + imagePushTask.tag.get()) } } def reportOnAllTask = project.rootProject.tasks.create("userDefinedReports${testGrouping.name.capitalize()}", KubesReporting) { @@ -99,6 +123,38 @@ class DistributedTesting implements Plugin { } } + private List generatePreAllocateAndDeAllocateTasksForGrouping(Project project, ParallelTestGroup testGrouping) { + PodAllocator allocator = new PodAllocator(project.getLogger()) + Task preAllocateTask = project.rootProject.tasks.create("preAllocateFor" + testGrouping.name.capitalize()) { + doFirst { + String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY) + if (dockerTag == null) { + throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_BUILDING_PROPERTY) + } + int seed = (dockerTag.hashCode() + testGrouping.name.hashCode()) + String podPrefix = new BigInteger(64, new Random(seed)).toString(36) + //here we will pre-request the correct number of pods for this testGroup + int numberOfPodsToRequest = testGrouping.getShardCount() + int coresPerPod = testGrouping.getCoresToUse() + int memoryGBPerPod = testGrouping.getGbOfMemory() + allocator.allocatePods(numberOfPodsToRequest, coresPerPod, memoryGBPerPod, podPrefix) + } + } + + Task deAllocateTask = project.rootProject.tasks.create("deAllocateFor" + testGrouping.name.capitalize()) { + doFirst { + String dockerTag = System.getProperty(ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY) + if (dockerTag == null) { + throw new GradleException("pre allocation cannot be used without a stable docker tag - please provide one using -D" + ImageBuilding.PROVIDE_TAG_FOR_RUNNING_PROPERTY) + } + int seed = (dockerTag.hashCode() + testGrouping.name.hashCode()) + String podPrefix = new BigInteger(64, new Random(seed)).toString(36); + allocator.tearDownPods(podPrefix) + } + } + return [preAllocateTask, deAllocateTask] + } + private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask, String providedTag) { def taskName = task.getName() def capitalizedTaskName = task.getName().capitalize() diff --git a/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.java b/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.java index ce4c738a2a..6d2346e7dc 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.java +++ b/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.java @@ -1,8 +1,17 @@ package net.corda.testing; import com.bmuschko.gradle.docker.DockerRegistryCredentials; -import com.bmuschko.gradle.docker.tasks.container.*; -import com.bmuschko.gradle.docker.tasks.image.*; +import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer; +import com.bmuschko.gradle.docker.tasks.container.DockerLogsContainer; +import com.bmuschko.gradle.docker.tasks.container.DockerRemoveContainer; +import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer; +import com.bmuschko.gradle.docker.tasks.container.DockerWaitContainer; +import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage; +import com.bmuschko.gradle.docker.tasks.image.DockerCommitImage; +import com.bmuschko.gradle.docker.tasks.image.DockerPullImage; +import com.bmuschko.gradle.docker.tasks.image.DockerPushImage; +import com.bmuschko.gradle.docker.tasks.image.DockerRemoveImage; +import com.bmuschko.gradle.docker.tasks.image.DockerTagImage; import org.gradle.api.GradleException; import org.gradle.api.Plugin; import org.gradle.api.Project; @@ -12,6 +21,7 @@ import java.io.File; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.UUID; /** * this plugin is responsible for setting up all the required docker image building tasks required for producing and pushing an @@ -20,7 +30,10 @@ import java.util.Map; public class ImageBuilding implements Plugin { public static final String registryName = "stefanotestingcr.azurecr.io/testing"; + public static final String PROVIDE_TAG_FOR_BUILDING_PROPERTY = "docker.build.tag"; + public static final String PROVIDE_TAG_FOR_RUNNING_PROPERTY = "docker.run.tag"; public DockerPushImage pushTask; + public DockerBuildImage buildTask; @Override public void apply(@NotNull final Project project) { @@ -43,6 +56,8 @@ public class ImageBuilding implements Plugin { dockerBuildImage.getDockerFile().set(new File(new File("testing"), "Dockerfile")); }); + this.buildTask = buildDockerImageForSource; + final DockerCreateContainer createBuildContainer = project.getTasks().create("createBuildContainer", DockerCreateContainer.class, dockerCreateContainer -> { final File baseWorkingDir = new File(System.getProperty("docker.work.dir") != null && @@ -58,7 +73,7 @@ public class ImageBuilding implements Plugin { mavenDir.mkdirs(); } - project.getLogger().info("Will use: ${gradleDir.absolutePath} for caching gradle artifacts"); + project.getLogger().info("Will use: " + gradleDir.getAbsolutePath() + " for caching gradle artifacts"); }); dockerCreateContainer.dependsOn(buildDockerImageForSource); dockerCreateContainer.targetImageId(buildDockerImageForSource.getImageId()); @@ -103,8 +118,7 @@ public class ImageBuilding implements Plugin { final DockerTagImage tagBuildImageResult = project.getTasks().create("tagBuildImageResult", DockerTagImage.class, dockerTagImage -> { dockerTagImage.dependsOn(commitBuildImageResult); dockerTagImage.getImageId().set(commitBuildImageResult.getImageId()); - dockerTagImage.getTag().set(System.getProperty("docker.provided.tag", - "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}")); + dockerTagImage.getTag().set(System.getProperty(PROVIDE_TAG_FOR_BUILDING_PROPERTY, UUID.randomUUID().toString().toLowerCase().substring(0, 12))); dockerTagImage.getRepository().set(registryName); }); diff --git a/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java index 3671bb7358..db7d139333 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java +++ b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java @@ -1,7 +1,18 @@ package net.corda.testing; -import io.fabric8.kubernetes.api.model.*; -import io.fabric8.kubernetes.client.*; +import io.fabric8.kubernetes.api.model.DoneablePod; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.Status; +import io.fabric8.kubernetes.api.model.StatusCause; +import io.fabric8.kubernetes.api.model.StatusDetails; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.utils.Serialization; @@ -11,12 +22,34 @@ import org.gradle.api.DefaultTask; import org.gradle.api.tasks.TaskAction; import org.jetbrains.annotations.NotNull; -import java.io.*; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.math.BigInteger; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -33,18 +66,20 @@ public class KubesTest extends DefaultTask { String fullTaskToExecutePath; String taskToExecuteName; Boolean printOutput = false; + Integer numberOfCoresPerFork = 4; Integer memoryGbPerFork = 6; public volatile List testOutput = Collections.emptyList(); public volatile List containerResults = Collections.emptyList(); - String namespace = "thisisatest"; + public static String NAMESPACE = "thisisatest"; int k8sTimeout = 50 * 1_000; int webSocketTimeout = k8sTimeout * 6; - int numberOfPods = 20; + int numberOfPods = 5; int timeoutInMinutesForPodToStart = 60; - Distribution distribution = Distribution.METHOD; + DistributeTestsBy distribution = DistributeTestsBy.METHOD; + PodLogLevel podLogLevel = PodLogLevel.INFO; @TaskAction public void runDistributedTests() { @@ -54,10 +89,8 @@ public class KubesTest extends DefaultTask { String stableRunId = rnd64Base36(new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())); String random = rnd64Base36(new Random()); - final KubernetesClient client = getKubernetesClient(); - - try { - client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> { + try (KubernetesClient client = getKubernetesClient()) { + client.pods().inNamespace(NAMESPACE).list().getItems().forEach(podToDelete -> { if (podToDelete.getMetadata().getName().contains(stableRunId)) { getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName()); client.resource(podToDelete).delete(); @@ -68,8 +101,9 @@ public class KubesTest extends DefaultTask { } List> futures = IntStream.range(0, numberOfPods).mapToObj(i -> { - String podName = taskToExecuteName.toLowerCase() + "-" + stableRunId + "-" + random + "-" + i; - return submitBuild(client, namespace, numberOfPods, i, podName, printOutput, 3); + String potentialPodName = (taskToExecuteName + "-" + stableRunId + random + i).toLowerCase(); + String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62)); + return submitBuild(NAMESPACE, numberOfPods, i, podName, printOutput, 3); }).collect(Collectors.toList()); this.testOutput = Collections.synchronizedList(futures.stream().map(it -> { @@ -107,8 +141,7 @@ public class KubesTest extends DefaultTask { .toLowerCase(); } - private Future submitBuild( - KubernetesClient client, + private CompletableFuture submitBuild( String namespace, int numberOfPods, int podIdx, @@ -116,7 +149,9 @@ public class KubesTest extends DefaultTask { boolean printOutput, int numberOfRetries ) { - return executorService.submit(() -> buildRunPodWithRetriesOrThrow(client, namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries)); + return CompletableFuture.supplyAsync(() -> { + return buildRunPodWithRetriesOrThrow(namespace, numberOfPods, podIdx, podName, printOutput, numberOfRetries); + }, executorService); } private static void addShutdownHook(Runnable hook) { @@ -125,27 +160,23 @@ public class KubesTest extends DefaultTask { private PersistentVolumeClaim createPvc(KubernetesClient client, String name) { PersistentVolumeClaim pvc = client.persistentVolumeClaims() - .inNamespace(namespace) + .inNamespace(NAMESPACE) .createNew() - .editOrNewMetadata().withName(name).endMetadata() - .editOrNewSpec() .withAccessModes("ReadWriteOnce") .editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources() .endSpec() - .done(); addShutdownHook(() -> { - System.out.println("Deleing PVC: " + pvc.getMetadata().getName()); + System.out.println("Deleting PVC: " + pvc.getMetadata().getName()); client.persistentVolumeClaims().delete(pvc); }); return pvc; } private KubePodResult buildRunPodWithRetriesOrThrow( - KubernetesClient client, String namespace, int numberOfPods, int podIdx, @@ -155,48 +186,48 @@ public class KubesTest extends DefaultTask { ) { addShutdownHook(() -> { System.out.println("deleting pod: " + podName); - client.pods().inNamespace(namespace).withName(podName).delete(); + try (KubernetesClient client = getKubernetesClient()) { + client.pods().inNamespace(namespace).withName(podName).delete(); + } }); try { // pods might die, so we retry return Retry.fixed(numberOfRetries).run(() -> { // remove pod if exists - PodResource oldPod = client.pods().inNamespace(namespace).withName(podName); - if (oldPod.get() != null) { - getLogger().lifecycle("deleting pod: {}", podName); - oldPod.delete(); - while (oldPod.get() != null) { - getLogger().info("waiting for pod {} to be removed", podName); - Thread.sleep(1000); + Pod createdPod; + try (KubernetesClient client = getKubernetesClient()) { + PodResource oldPod = client.pods().inNamespace(namespace).withName(podName); + if (oldPod.get() != null) { + getLogger().lifecycle("deleting pod: {}", podName); + oldPod.delete(); + while (oldPod.get() != null) { + getLogger().info("waiting for pod {} to be removed", podName); + Thread.sleep(1000); + } } + getProject().getLogger().lifecycle("creating pod: " + podName); + createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName)); + getProject().getLogger().lifecycle("scheduled pod: " + podName); } - // recreate and run - getProject().getLogger().lifecycle("creating pod: " + podName); - Pod createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName)); - getProject().getLogger().lifecycle("scheduled pod: " + podName); - - attachStatusListenerToPod(client, createdPod); - waitForPodToStart(client, createdPod); + attachStatusListenerToPod(createdPod); + waitForPodToStart(createdPod); PipedOutputStream stdOutOs = new PipedOutputStream(); PipedInputStream stdOutIs = new PipedInputStream(4096); ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream(); CompletableFuture waiter = new CompletableFuture<>(); - ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter); - stdOutIs.connect(stdOutOs); - client.pods().inNamespace(namespace).withName(podName) - .writingOutput(stdOutOs) - .writingErrorChannel(errChannelStream) - .usingListener(execListener) - .exec(getBuildCommand(numberOfPods, podIdx)); + File podOutput = executeBuild(namespace, numberOfPods, podIdx, podName, printOutput, stdOutOs, stdOutIs, errChannelStream, waiter); - File podOutput = startLogPumping(stdOutIs, podIdx, printOutput); int resCode = waiter.join(); getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + "), gathering results"); - Collection binaryResults = downloadTestXmlFromPod(client, namespace, createdPod); + Collection binaryResults = downloadTestXmlFromPod(namespace, createdPod); + getLogger().lifecycle("removing pod " + podName + " (" + podIdx + "/" + numberOfPods + ") after completed build"); + try (KubernetesClient client = getKubernetesClient()) { + client.pods().delete(createdPod); + } return new KubePodResult(resCode, podOutput, binaryResults); }); } catch (Retry.RetryException e) { @@ -204,6 +235,31 @@ public class KubesTest extends DefaultTask { } } + @NotNull + private File executeBuild(String namespace, + int numberOfPods, + int podIdx, + String podName, + boolean printOutput, + PipedOutputStream stdOutOs, + PipedInputStream stdOutIs, + ByteArrayOutputStream errChannelStream, + CompletableFuture waiter) throws IOException { + KubernetesClient client = getKubernetesClient(); + ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter); + stdOutIs.connect(stdOutOs); + + String[] buildCommand = getBuildCommand(numberOfPods, podIdx); + getProject().getLogger().quiet("About to execute " + Arrays.stream(buildCommand).reduce("", (s, s2) -> s + " " + s2) + " on pod " + podName); + client.pods().inNamespace(namespace).withName(podName) + .writingOutput(stdOutOs) + .writingErrorChannel(errChannelStream) + .usingListener(execListener) + .exec(getBuildCommand(numberOfPods, podIdx)); + + return startLogPumping(stdOutIs, podIdx, printOutput); + } + private Pod buildPodRequest(String podName) { return new PodBuilder() .withNewMetadata().withName(podName).endMetadata() @@ -283,7 +339,8 @@ public class KubesTest extends DefaultTask { return outputFile; } - private Watch attachStatusListenerToPod(KubernetesClient client, Pod pod) { + private Watch attachStatusListenerToPod(Pod pod) { + KubernetesClient client = getKubernetesClient(); return client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher() { @Override public void eventReceived(Watcher.Action action, Pod resource) { @@ -292,21 +349,24 @@ public class KubesTest extends DefaultTask { @Override public void onClose(KubernetesClientException cause) { + client.close(); } }); } - private void waitForPodToStart(KubernetesClient client, Pod pod) { - getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build"); - try { - client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES); - } catch (InterruptedException e) { - throw new RuntimeException(e); + private void waitForPodToStart(Pod pod) { + try (KubernetesClient client = getKubernetesClient()) { + getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build"); + try { + client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build"); } - getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build"); } - private Collection downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) { + private Collection downloadTestXmlFromPod(String namespace, Pod cp) { String resultsInContainerPath = "/tmp/source/build/test-reports"; String binaryResultsFile = "results.bin"; String podName = cp.getMetadata().getName(); @@ -315,25 +375,42 @@ public class KubesTest extends DefaultTask { if (!tempDir.toFile().exists()) { tempDir.toFile().mkdirs(); } - getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath()); - client.pods() - .inNamespace(namespace) - .withName(podName) - .dir(resultsInContainerPath) - .copy(tempDir); - + try (KubernetesClient client = getKubernetesClient()) { + client.pods() + .inNamespace(namespace) + .withName(podName) + .dir(resultsInContainerPath) + .copy(tempDir); + } return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile); } private String[] getBuildCommand(int numberOfPods, int podIdx) { String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " + "let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" + - "./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" + + "./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " " + getLoggingLevel() + " 2>&1 ;" + "let rs=$? ; sleep 10 ; exit ${rs}"; return new String[]{"bash", "-c", shellScript}; } + private String getLoggingLevel() { + + switch (podLogLevel) { + case INFO: + return " --info"; + case WARN: + return " --warn"; + case QUIET: + return " --quiet"; + case DEBUG: + return " --debug"; + default: + throw new IllegalArgumentException("LogLevel: " + podLogLevel + " is unknown"); + } + + } + private List findFolderContainingBinaryResultsFile(File start, String fileNameToFind) { Queue filesToInspect = new LinkedList<>(Collections.singletonList(start)); List folders = new ArrayList<>(); @@ -362,7 +439,7 @@ public class KubesTest extends DefaultTask { @Override public void onFailure(Throwable t, Response response) { - getProject().getLogger().lifecycle("Received error from rom pod " + podName); + getProject().getLogger().lifecycle("Received error from pod " + podName); waitingFuture.completeExceptionally(t); } diff --git a/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy b/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy index b1836d1eec..789bab4392 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy @@ -47,7 +47,7 @@ class ListTests extends DefaultTask implements TestLister { FileCollection scanClassPath List allTests - Distribution distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? Distribution.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : Distribution.METHOD + DistributeTestsBy distribution = System.getProperty(DISTRIBUTION_PROPERTY) ? DistributeTestsBy.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : DistributeTestsBy.METHOD def getTestsForFork(int fork, int forks, Integer seed) { def gitSha = new BigInteger(project.hasProperty("corda_revision") ? project.property("corda_revision").toString() : "0", 36) @@ -66,7 +66,7 @@ class ListTests extends DefaultTask implements TestLister { @TaskAction def discoverTests() { switch (distribution) { - case Distribution.METHOD: + case DistributeTestsBy.METHOD: Collection results = new ClassGraph() .enableClassInfo() .enableMethodInfo() @@ -85,7 +85,7 @@ class ListTests extends DefaultTask implements TestLister { this.allTests = results.stream().sorted().collect(Collectors.toList()) break - case Distribution.CLASS: + case DistributeTestsBy.CLASS: Collection results = new ClassGraph() .enableClassInfo() .enableMethodInfo() @@ -105,6 +105,6 @@ class ListTests extends DefaultTask implements TestLister { } } -public enum Distribution { +public enum DistributeTestsBy { CLASS, METHOD } \ No newline at end of file diff --git a/buildSrc/src/main/groovy/net/corda/testing/ParallelTestGroup.groovy b/buildSrc/src/main/groovy/net/corda/testing/ParallelTestGroup.groovy index 322b134c70..19e589fb5f 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/ParallelTestGroup.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/ParallelTestGroup.groovy @@ -3,19 +3,25 @@ package net.corda.testing import org.gradle.api.DefaultTask class ParallelTestGroup extends DefaultTask { - Distribution distribution = Distribution.METHOD + + DistributeTestsBy distribution = DistributeTestsBy.METHOD List groups = new ArrayList<>() int shardCount = 20 int coresToUse = 4 int gbOfMemory = 4 boolean printToStdOut = true + PodLogLevel logLevel = PodLogLevel.INFO void numberOfShards(int shards) { this.shardCount = shards } - void distribute(Distribution dist){ + void podLogLevel(PodLogLevel level) { + this.logLevel = level + } + + void distribute(DistributeTestsBy dist) { this.distribution = dist } diff --git a/buildSrc/src/main/groovy/net/corda/testing/PodAllocator.java b/buildSrc/src/main/groovy/net/corda/testing/PodAllocator.java new file mode 100644 index 0000000000..e0a11a5fb3 --- /dev/null +++ b/buildSrc/src/main/groovy/net/corda/testing/PodAllocator.java @@ -0,0 +1,135 @@ +package net.corda.testing; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +public class PodAllocator { + + private static final int CONNECTION_TIMEOUT = 60_1000; + private final Logger logger; + + public PodAllocator(Logger logger) { + this.logger = logger; + } + + public PodAllocator() { + this.logger = LoggerFactory.getLogger(PodAllocator.class); + } + + public void allocatePods(Integer number, Integer coresPerPod, Integer memoryPerPod, String prefix) { + + Config config = new ConfigBuilder() + .withConnectionTimeout(CONNECTION_TIMEOUT) + .withRequestTimeout(CONNECTION_TIMEOUT) + .withRollingTimeout(CONNECTION_TIMEOUT) + .withWebsocketTimeout(CONNECTION_TIMEOUT) + .withWebsocketPingInterval(CONNECTION_TIMEOUT) + .build(); + + KubernetesClient client = new DefaultKubernetesClient(config); + + List podsToRequest = IntStream.range(0, number).mapToObj(i -> buildPod("pa-" + prefix + i, coresPerPod, memoryPerPod)).collect(Collectors.toList()); + podsToRequest.forEach(requestedPod -> { + String msg = "PreAllocating " + requestedPod.getMetadata().getName(); + if (logger instanceof org.gradle.api.logging.Logger) { + ((org.gradle.api.logging.Logger) logger).quiet(msg); + } else { + logger.info(msg); + } + client.pods().inNamespace(KubesTest.NAMESPACE).create(requestedPod); + }); + } + + public void tearDownPods(String prefix) { + io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder() + .withConnectionTimeout(CONNECTION_TIMEOUT) + .withRequestTimeout(CONNECTION_TIMEOUT) + .withRollingTimeout(CONNECTION_TIMEOUT) + .withWebsocketTimeout(CONNECTION_TIMEOUT) + .withWebsocketPingInterval(CONNECTION_TIMEOUT) + .build(); + KubernetesClient client = new DefaultKubernetesClient(config); + Stream podsToDelete = client.pods().inNamespace(KubesTest.NAMESPACE).list() + .getItems() + .stream() + .sorted(Comparator.comparing(p -> p.getMetadata().getName())) + .filter(foundPod -> foundPod.getMetadata().getName().contains(prefix)); + + List> deleteFutures = podsToDelete.map(pod -> { + CompletableFuture result = new CompletableFuture<>(); + Watch watch = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher() { + @Override + public void eventReceived(Action action, Pod resource) { + if (action == Action.DELETED) { + result.complete(resource); + String msg = "Successfully deleted pod " + pod.getMetadata().getName(); + if (logger instanceof org.gradle.api.logging.Logger) { + ((org.gradle.api.logging.Logger) logger).lifecycle(msg); + } else { + logger.info(msg); + } + } + } + + @Override + public void onClose(KubernetesClientException cause) { + String message = "Failed to delete pod " + pod.getMetadata().getName(); + if (logger instanceof org.gradle.api.logging.Logger) { + ((org.gradle.api.logging.Logger) logger).quiet(message); + } else { + logger.info(message); + } + result.completeExceptionally(cause); + } + }); + client.pods().delete(pod); + return result; + }).collect(Collectors.toList()); + + try { + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + //ignore - there's nothing left to do + } + } + + + Pod buildPod(String podName, Integer coresPerPod, Integer memoryPerPod) { + return new PodBuilder().withNewMetadata().withName(podName).endMetadata() + .withNewSpec() + .addNewContainer() + .withImage("busybox:latest") + .withCommand("sh") + .withArgs("-c", "sleep 300") + .withName(podName) + .withNewResources() + .addToRequests("cpu", new Quantity(coresPerPod.toString())) + .addToRequests("memory", new Quantity(memoryPerPod.toString() + "Gi")) + .endResources() + .endContainer() + .withRestartPolicy("Never") + .endSpec() + .build(); + } + +} diff --git a/buildSrc/src/main/groovy/net/corda/testing/PodLogLevel.java b/buildSrc/src/main/groovy/net/corda/testing/PodLogLevel.java new file mode 100644 index 0000000000..27ce56f9f2 --- /dev/null +++ b/buildSrc/src/main/groovy/net/corda/testing/PodLogLevel.java @@ -0,0 +1,5 @@ +package net.corda.testing; + +public enum PodLogLevel { + QUIET, WARN, INFO, DEBUG +} \ No newline at end of file