Allow taints on kubernetes nodes to determine scheduling of tests (#5755)

* allow a test group to specify taints for the nodes

* specify "effect" of toleration
modify test groups to use taints
modify preallocation to use taints

* add extra command line flags for unit test run
This commit is contained in:
Stefano Franz 2019-11-25 09:39:37 +00:00 committed by GitHub
parent ab3c4ac7f8
commit 16eb2fce78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 95 additions and 74 deletions

20
Jenkinsfile vendored
View File

@ -1,3 +1,4 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
@Library('existing-build-control')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
@ -23,7 +24,7 @@ pipeline {
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage preAllocateForAllParallelUnitAndIntegrationTest --stacktrace"
" clean pushBuildImage preAllocateForAllParallelIntegrationTest preAllocateForAllParallelUnitTest --stacktrace"
}
sh "kubectl auth can-i get pods"
}
@ -31,7 +32,7 @@ pipeline {
stage('Corda Pull Request - Run Tests') {
parallel {
stage('Integration and Unit Tests') {
stage('Integration Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
@ -41,7 +42,20 @@ pipeline {
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${CHANGE_TARGET}\" " +
" deAllocateForAllParallelUnitAndIntegrationTest allParallelUnitAndIntegrationTest --stacktrace"
" deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace"
}
}
stage('Unit Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${CHANGE_TARGET}\" " +
" deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace"
}
}
}

View File

@ -601,7 +601,7 @@ task allParallelIntegrationTest(type: ParallelTestGroup) {
numberOfShards 10
streamOutput false
coresPerFork 5
memoryInGbPerFork 10
memoryInGbPerFork 12
distribute DistributeTestsBy.METHOD
}
task allParallelUnitTest(type: ParallelTestGroup) {
@ -609,9 +609,10 @@ task allParallelUnitTest(type: ParallelTestGroup) {
testGroups "test"
numberOfShards 10
streamOutput false
coresPerFork 5
memoryInGbPerFork 6
coresPerFork 3
memoryInGbPerFork 12
distribute DistributeTestsBy.CLASS
nodeTaints "small"
}
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest"

View File

@ -118,10 +118,11 @@ class DistributedTesting implements Plugin<Project> {
numberOfCoresPerFork = testGrouping.getCoresToUse()
distribution = testGrouping.getDistribution()
podLogLevel = testGrouping.getLogLevel()
taints = testGrouping.getNodeTaints()
sidecarImage = testGrouping.sidecarImage
additionalArgs = testGrouping.additionalArgs
doFirst {
dockerTag = tagToUseForRunningTests ? (ImageBuilding.registryName + ":" + tagToUseForRunningTests) : (imagePushTask.imageName.get() + ":" + imagePushTask.tag.get())
sidecarImage = testGrouping.sidecarImage
additionalArgs = testGrouping.additionalArgs
}
}
def reportOnAllTask = project.rootProject.tasks.create("userDefinedReports${testGrouping.getName().capitalize()}", KubesReporting) {
@ -165,7 +166,7 @@ class DistributedTesting implements Plugin<Project> {
int numberOfPodsToRequest = testGrouping.getShardCount()
int coresPerPod = testGrouping.getCoresToUse()
int memoryGBPerPod = testGrouping.getGbOfMemory()
allocator.allocatePods(numberOfPodsToRequest, coresPerPod, memoryGBPerPod, podPrefix)
allocator.allocatePods(numberOfPodsToRequest, coresPerPod, memoryGBPerPod, podPrefix, testGrouping.getNodeTaints())
}
}

View File

@ -1,13 +1,18 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.ContainerFluent;
import io.fabric8.kubernetes.api.model.DoneablePod;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodFluent;
import io.fabric8.kubernetes.api.model.PodSpecFluent;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusCause;
import io.fabric8.kubernetes.api.model.StatusDetails;
import io.fabric8.kubernetes.api.model.Toleration;
import io.fabric8.kubernetes.api.model.TolerationBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
@ -77,12 +82,13 @@ public class KubesTest extends DefaultTask {
String sidecarImage;
Boolean printOutput = false;
List<String> additionalArgs;
List<String> taints = Collections.emptyList();
Integer numberOfCoresPerFork = 4;
Integer memoryGbPerFork = 6;
public volatile List<File> testOutput = Collections.emptyList();
public volatile List<KubePodResult> containerResults = Collections.emptyList();
private final Set<String> remainingPods = Collections.synchronizedSet(new HashSet());
private final Set<String> remainingPods = Collections.synchronizedSet(new HashSet<>());
public static String NAMESPACE = "thisisatest";
@ -341,34 +347,7 @@ public class KubesTest extends DefaultTask {
}
private Pod buildPodRequestWithOnlyWorkerNode(String podName, PersistentVolumeClaim pvc) {
return new PodBuilder()
.withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewVolume()
.withName("gradlecache")
.withNewHostPath()
.withType("DirectoryOrCreate")
.withPath("/tmp/gradle")
.endHostPath()
.endVolume()
.addNewVolume()
.withName("testruns")
.withNewPersistentVolumeClaim()
.withClaimName(pvc.getMetadata().getName())
.endPersistentVolumeClaim()
.endVolume()
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
.withArgs("-c", "sleep 3600")
.addNewEnv()
.withName("DRIVER_NODE_MEMORY")
.withValue("1024m")
.withName("DRIVER_WEB_MEMORY")
.withValue("1024m")
.endEnv()
.withName(podName)
.withNewResources()
return getBasePodDefinition(podName, pvc)
.addToRequests("cpu", new Quantity(numberOfCoresPerFork.toString()))
.addToRequests("memory", new Quantity(memoryGbPerFork.toString()))
.endResources()
@ -382,43 +361,13 @@ public class KubesTest extends DefaultTask {
}
private Pod buildPodRequestWithWorkerNodeAndDbContainer(String podName, PersistentVolumeClaim pvc) {
return new PodBuilder()
.withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewVolume()
.withName("gradlecache")
.withNewHostPath()
.withType("DirectoryOrCreate")
.withPath("/tmp/gradle")
.endHostPath()
.endVolume()
.addNewVolume()
.withName("testruns")
.withNewPersistentVolumeClaim()
.withClaimName(pvc.getMetadata().getName())
.endPersistentVolumeClaim()
.endVolume()
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
.withArgs("-c", "sleep 3600")
.addNewEnv()
.withName("DRIVER_NODE_MEMORY")
.withValue("1024m")
.withName("DRIVER_WEB_MEMORY")
.withValue("1024m")
.endEnv()
.withName(podName)
.withNewResources()
return getBasePodDefinition(podName, pvc)
.addToRequests("cpu", new Quantity(Integer.valueOf(numberOfCoresPerFork - 1).toString()))
.addToRequests("memory", new Quantity(Integer.valueOf(memoryGbPerFork - 1).toString() + "Gi"))
.endResources()
.addNewVolumeMount().withName("gradlecache").withMountPath("/tmp/gradle").endVolumeMount()
.addNewVolumeMount().withName("testruns").withMountPath(TEST_RUN_DIR).endVolumeMount()
.endContainer()
.addNewContainer()
.withImage(sidecarImage)
.addNewEnv()
@ -440,6 +389,39 @@ public class KubesTest extends DefaultTask {
.build();
}
private ContainerFluent.ResourcesNested<PodSpecFluent.ContainersNested<PodFluent.SpecNested<PodBuilder>>> getBasePodDefinition(String podName, PersistentVolumeClaim pvc) {
return new PodBuilder()
.withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewVolume()
.withName("gradlecache")
.withNewHostPath()
.withType("DirectoryOrCreate")
.withPath("/tmp/gradle")
.endHostPath()
.endVolume()
.addNewVolume()
.withName("testruns")
.withNewPersistentVolumeClaim()
.withClaimName(pvc.getMetadata().getName())
.endPersistentVolumeClaim()
.endVolume()
.withTolerations(taints.stream().map(taint -> new TolerationBuilder().withKey("key").withValue(taint).withOperator("Equal").withEffect("NoSchedule").build()).collect(Collectors.toList()))
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
.withArgs("-c", "sleep 3600")
.addNewEnv()
.withName("DRIVER_NODE_MEMORY")
.withValue("1024m")
.withName("DRIVER_WEB_MEMORY")
.withValue("1024m")
.endEnv()
.withName(podName)
.withNewResources();
}
private File startLogPumping(InputStream stdOutIs, int podIdx, File podLogsDirectory, boolean printOutput) throws IOException {
File outputFile = new File(podLogsDirectory, "container-" + podIdx + ".log");

View File

@ -17,6 +17,7 @@ public class ParallelTestGroup extends DefaultTask {
private PodLogLevel logLevel = PodLogLevel.INFO;
private String sidecarImage;
private List<String> additionalArgs = new ArrayList<>();
private List<String> taints = new ArrayList<>();
public DistributeTestsBy getDistribution() {
return distribution;
@ -46,9 +47,17 @@ public class ParallelTestGroup extends DefaultTask {
return logLevel;
}
public String getSidecarImage() { return sidecarImage; }
public String getSidecarImage() {
return sidecarImage;
}
public List<String> getAdditionalArgs() { return additionalArgs; }
public List<String> getAdditionalArgs() {
return additionalArgs;
}
public List<String> getNodeTaints(){
return new ArrayList<>(taints);
}
public void numberOfShards(int shards) {
this.shardCount = shards;
@ -95,4 +104,12 @@ public class ParallelTestGroup extends DefaultTask {
this.additionalArgs.addAll(additionalArgs);
}
public void nodeTaints(String... additionalArgs) {
nodeTaints(Arrays.asList(additionalArgs));
}
private void nodeTaints(List<String> additionalArgs) {
this.taints.addAll(additionalArgs);
}
}

View File

@ -1,6 +1,7 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.TolerationBuilder;
import io.fabric8.kubernetes.api.model.batch.Job;
import io.fabric8.kubernetes.api.model.batch.JobBuilder;
import io.fabric8.kubernetes.client.Config;
@ -36,12 +37,16 @@ public class PodAllocator {
this.logger = LoggerFactory.getLogger(PodAllocator.class);
}
public void allocatePods(Integer number, Integer coresPerPod, Integer memoryPerPod, String prefix) {
public void allocatePods(Integer number,
Integer coresPerPod,
Integer memoryPerPod,
String prefix,
List<String> taints) {
Config config = getConfig();
KubernetesClient client = new DefaultKubernetesClient(config);
List<Job> podsToRequest = IntStream.range(0, number).mapToObj(i -> buildJob("pa-" + prefix + i, coresPerPod, memoryPerPod)).collect(Collectors.toList());
List<Job> podsToRequest = IntStream.range(0, number).mapToObj(i -> buildJob("pa-" + prefix + i, coresPerPod, memoryPerPod, taints)).collect(Collectors.toList());
List<Job> createdJobs = podsToRequest.stream().map(requestedJob -> {
String msg = "PreAllocating " + requestedJob.getMetadata().getName();
if (logger instanceof org.gradle.api.logging.Logger) {
@ -112,7 +117,7 @@ public class PodAllocator {
}
Job buildJob(String podName, Integer coresPerPod, Integer memoryPerPod) {
Job buildJob(String podName, Integer coresPerPod, Integer memoryPerPod, List<String> taints) {
return new JobBuilder().withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.withTtlSecondsAfterFinished(10)
@ -121,6 +126,7 @@ public class PodAllocator {
.withName(podName + "-pod")
.endMetadata()
.withNewSpec()
.withTolerations(taints.stream().map(taint -> new TolerationBuilder().withKey("key").withValue(taint).withOperator("Equal").withEffect("NoSchedule").build()).collect(Collectors.toList()))
.addNewContainer()
.withImage("busybox:latest")
.withCommand("sh")