add ability to group test types together (#5459)

* add ability to group test types together

* add ability to specify podCount for use in parallel testing

* remove compiler xml

* add Jenkinsfile to enable scanning

* trigger build

* add ability to specify what docker tag to use from outside of the build

* fix docker work dir

* fix pipeline syntax issues

* use environment rather than `def`

* move agent restrictor outside of stages block

* use steps block

* more pipeline syntax fixes

* even more pipeline syntax fixes

* even more pipeline syntax fixes

* add kubenetize as property to image build

* move clear of docker image to end of build rather than start to prevent colocated builds

* escape dollar on docker image remove command

* attempt to kill all existing jobs

* fix compile issue due to killall_jobs

* fix compile issue due to killall_jobs pt2

* fix spelling

* make all variables environment variables

* add logic to delete images locally after pushing

* wrap testing phase with try / finally so that junit reports are always evaluated

* change the behaviour around post build actions

* break implicit link between testing phase and image building phase, allowing testing to occur without a rebuild and push of image

* prepend registry name to provided tag

* allow tasks to specify whether they wish to stream output from containers

* add timestamps directive to Jenkinsfile to have timing info on output

* make KubesTest resilient against transient pod failures in k8s

* increase CPU request

* add logic to allow specifying container resource requests

* attempt to run unit and integration tests in parallel

* change unit tests to use 3 cores to allow co-location on 8c machines

* join grouped tests together to give pod meaningful name

* add step to renew token with GKE

* change renew step to use pods instead of nodes

* fix bug where memory request is not correctly passed to pod

* disable unit tests for now
This commit is contained in:
Stefano Franz 2019-09-19 17:41:06 +00:00 committed by GitHub
parent 7bbbc71fdf
commit 974c45bb3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 365 additions and 250 deletions

52
Jenkinsfile vendored
View File

@ -2,6 +2,8 @@ killall_jobs()
pipeline {
agent { label 'k8s' }
options { timestamps() }
environment {
DOCKER_TAG_TO_USE = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
@ -9,7 +11,7 @@ pipeline {
}
stages {
stage('Corda Pull Request Integration Tests - Generate Build Image') {
stage('Corda Pull Request - Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
@ -19,26 +21,42 @@ pipeline {
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
}
}
}
stage('Corda Pull Request Integration Tests - Run Integration Tests') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelIntegrationTest"
}
junit '**/build/test-results-xml/**/*.xml'
sh "kubectl auth can-i get pods"
}
}
stage('Clear testing images') {
steps {
sh """docker rmi -f \$(docker images | grep \${DOCKER_TAG_TO_USE} | awk '{print \$3}') || echo \"there were no images to delete\""""
stage('Corda Pull Request - Run Tests') {
parallel {
stage('Integration Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelIntegrationTest"
}
post {
always {
junit '**/build/test-results-xml/**/*.xml'
}
}
}
// stage('Unit Tests') {
// steps {
// sh "./gradlew " +
// "-DbuildId=\"\${BUILD_ID}\" " +
// "-Dkubenetize=true " +
// "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
// " allParallelUnitTest"
// }
// post {
// always {
// junit '**/build/test-results-xml/**/*.xml'
// }
// }
// }
}
}
}
}

View File

@ -1,4 +1,5 @@
import net.corda.testing.DistributedTesting
import net.corda.testing.ParallelTestGroup
import static org.gradle.api.JavaVersion.VERSION_1_8
import static org.gradle.api.JavaVersion.VERSION_11
@ -365,6 +366,10 @@ allprojects {
if (!JavaVersion.current().java8Compatible)
throw new GradleException("Corda requires Java 8, please upgrade to at least 1.8.0_$java8_minUpdateVersion")
configurations {
detekt
}
// Required for building out the fat JAR.
dependencies {
compile project(':node')
@ -384,6 +389,7 @@ dependencies {
runtime project(':finance:contracts')
runtime project(':webserver')
testCompile project(':test-utils')
detekt 'io.gitlab.arturbosch.detekt:detekt-cli:1.0.1'
}
jar {
@ -412,6 +418,26 @@ task jacocoRootReport(type: org.gradle.testing.jacoco.tasks.JacocoReport) {
}
}
task detekt(type: JavaExec) {
main = "io.gitlab.arturbosch.detekt.cli.Main"
classpath = configurations.detekt
def input = "$projectDir"
def config = "$projectDir/detekt-config.yml"
def baseline = "$projectDir/detekt-baseline.xml"
def params = ['-i', input, '-c', config, '-b', baseline]
args(params)
}
task detektBaseline(type: JavaExec) {
main = "io.gitlab.arturbosch.detekt.cli.Main"
classpath = configurations.detekt
def input = "$projectDir"
def config = "$projectDir/detekt-config.yml"
def baseline = "$projectDir/detekt-baseline.xml"
def params = ['-i', input, '-c', config, '-b', baseline, '--create-baseline']
args(params)
}
tasks.withType(Test) {
reports.html.destination = file("${reporting.baseDir}/${name}")
}
@ -553,32 +579,27 @@ buildScan {
termsOfServiceAgree = 'yes'
}
task allParallelIntegrationTest(type: ParallelTestGroup) {
testGroups "integrationTest"
numberOfShards 15
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
}
task allParallelUnitTest(type: ParallelTestGroup) {
testGroups "test"
numberOfShards 15
streamOutput false
coresPerFork 3
memoryInGbPerFork 6
}
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest"
numberOfShards 20
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
}
apply plugin: DistributedTesting
configurations {
detekt
}
dependencies {
detekt 'io.gitlab.arturbosch.detekt:detekt-cli:1.0.1'
}
task detekt(type: JavaExec) {
main = "io.gitlab.arturbosch.detekt.cli.Main"
classpath = configurations.detekt
def input = "$projectDir"
def config = "$projectDir/detekt-config.yml"
def baseline = "$projectDir/detekt-baseline.xml"
def params = ['-i', input, '-c', config, '-b', baseline]
args(params)
}
task detektBaseline(type: JavaExec) {
main = "io.gitlab.arturbosch.detekt.cli.Main"
classpath = configurations.detekt
def input = "$projectDir"
def config = "$projectDir/detekt-config.yml"
def baseline = "$projectDir/detekt-baseline.xml"
def params = ['-i', input, '-c', config, '-b', baseline, '--create-baseline']
args(params)
}

View File

@ -35,6 +35,7 @@ dependencies {
compile gradleApi()
compile "io.fabric8:kubernetes-client:4.4.1"
compile 'org.apache.commons:commons-compress:1.19'
compile 'org.apache.commons:commons-lang3:3.9'
compile 'commons-codec:commons-codec:1.13'
compile "io.github.classgraph:classgraph:$class_graph_version"
compile "com.bmuschko:gradle-docker-plugin:5.0.0"

View File

@ -1,10 +1,8 @@
package net.corda.testing
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage
import org.gradle.api.Plugin
import org.gradle.api.Project
import org.gradle.api.Task
import org.gradle.api.tasks.testing.Test
/**
@ -22,6 +20,7 @@ class DistributedTesting implements Plugin<Project> {
ensureImagePluginIsApplied(project)
ImageBuilding imagePlugin = project.plugins.getPlugin(ImageBuilding)
DockerPushImage imageBuildingTask = imagePlugin.pushTask
String providedTag = System.getProperty("docker.tag")
//in each subproject
//1. add the task to determine all tests within the module
@ -31,7 +30,7 @@ class DistributedTesting implements Plugin<Project> {
subProject.tasks.withType(Test) { Test task ->
ListTests testListerTask = createTestListingTasks(task, subProject)
Test modifiedTestTask = modifyTestTaskForParallelExecution(subProject, task, testListerTask)
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask)
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask, providedTag)
}
}
@ -45,55 +44,57 @@ class DistributedTesting implements Plugin<Project> {
//first step is to create a single task which will invoke all the submodule tasks for each grouping
//ie allParallelTest will invoke [node:test, core:test, client:rpc:test ... etc]
//ie allIntegrationTest will invoke [node:integrationTest, core:integrationTest, client:rpc:integrationTest ... etc]
createGroupedParallelTestTasks(allKubesTestingTasksGroupedByType, project, imageBuildingTask)
Set<ParallelTestGroup> userGroups = new HashSet<>(project.tasks.withType(ParallelTestGroup))
Collection<ParallelTestGroup> userDefinedGroups = userGroups.forEach { testGrouping ->
List<KubesTest> groups = ((ParallelTestGroup) testGrouping).groups.collect {
allKubesTestingTasksGroupedByType.get(it)
}.flatten()
String superListOfTasks = groups.collect { it.fullTaskToExecutePath }.join(" ")
def userDefinedParallelTask = project.rootProject.tasks.create("userDefined" + testGrouping.name.capitalize(), KubesTest) {
if (!providedTag) {
dependsOn imageBuildingTask
}
numberOfPods = testGrouping.getShardCount()
printOutput = testGrouping.printToStdOut
fullTaskToExecutePath = superListOfTasks
taskToExecuteName = testGrouping.groups.join("And")
memoryGbPerFork = testGrouping.gbOfMemory
numberOfCoresPerFork = testGrouping.coresToUse
doFirst {
dockerTag = dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get())
}
}
def reportOnAllTask = project.rootProject.tasks.create("userDefinedReports${testGrouping.name.capitalize()}", KubesReporting) {
dependsOn userDefinedParallelTask
destinationDir new File(project.rootProject.getBuildDir(), "userDefinedReports${testGrouping.name.capitalize()}")
doFirst {
destinationDir.deleteDir()
shouldPrintOutput = !testGrouping.printToStdOut
podResults = userDefinedParallelTask.containerResults
reportOn(userDefinedParallelTask.testOutput)
}
}
userDefinedParallelTask.finalizedBy(reportOnAllTask)
testGrouping.dependsOn(userDefinedParallelTask)
}
}
}
private List<Task> createGroupedParallelTestTasks(Map<String, List<KubesTest>> allKubesTestingTasksGroupedByType, Project project, DockerPushImage imageBuildingTask) {
allKubesTestingTasksGroupedByType.entrySet().collect { entry ->
def taskType = entry.key
def allTasksOfType = entry.value
def allParallelTask = project.rootProject.tasks.create("allParallel" + taskType.capitalize(), KubesTest) {
dependsOn imageBuildingTask
printOutput = true
fullTaskToExecutePath = allTasksOfType.collect { task -> task.fullTaskToExecutePath }.join(" ")
taskToExecuteName = taskType
doFirst {
dockerTag = imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get()
}
}
//second step is to create a task to use the reports output by the parallel test task
def reportOnAllTask = project.rootProject.tasks.create("reportAllParallel${taskType.capitalize()}", KubesReporting) {
dependsOn allParallelTask
destinationDir new File(project.rootProject.getBuildDir(), "allResults${taskType.capitalize()}")
doFirst {
destinationDir.deleteDir()
podResults = allParallelTask.containerResults
reportOn(allParallelTask.testOutput)
}
}
//invoke this report task after parallel testing
allParallelTask.finalizedBy(reportOnAllTask)
project.logger.info "Created task: ${allParallelTask.getPath()} to enable testing on kubenetes for tasks: ${allParallelTask.fullTaskToExecutePath}"
project.logger.info "Created task: ${reportOnAllTask.getPath()} to generate test html output for task ${allParallelTask.getPath()}"
return allParallelTask
}
}
private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask) {
private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask, String providedTag) {
def taskName = task.getName()
def capitalizedTaskName = task.getName().capitalize()
KubesTest createdParallelTestTask = projectContainingTask.tasks.create("parallel" + capitalizedTaskName, KubesTest) {
dependsOn imageBuildingTask
if (!providedTag) {
dependsOn imageBuildingTask
}
printOutput = true
fullTaskToExecutePath = task.getPath()
taskToExecuteName = taskName
doFirst {
dockerTag = imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get()
dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get())
}
}
projectContainingTask.logger.info "Created task: ${createdParallelTestTask.getPath()} to enable testing on kubenetes for task: ${task.getPath()}"

View File

@ -1,10 +1,7 @@
package net.corda.testing
import com.bmuschko.gradle.docker.DockerRegistryCredentials
import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer
import com.bmuschko.gradle.docker.tasks.container.DockerLogsContainer
import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer
import com.bmuschko.gradle.docker.tasks.container.DockerWaitContainer
import com.bmuschko.gradle.docker.tasks.container.*
import com.bmuschko.gradle.docker.tasks.image.*
import org.gradle.api.GradleException
import org.gradle.api.Plugin
@ -16,6 +13,7 @@ import org.gradle.api.Project
*/
class ImageBuilding implements Plugin<Project> {
public static final String registryName = "stefanotestingcr.azurecr.io/testing"
DockerPushImage pushTask
@Override
@ -25,7 +23,7 @@ class ImageBuilding implements Plugin<Project> {
registryCredentialsForPush.username.set("stefanotestingcr")
registryCredentialsForPush.password.set(System.getProperty("docker.push.password") ? System.getProperty("docker.push.password") : "")
DockerPullImage pullTask = project.tasks.create("pullBaseImage", DockerPullImage){
DockerPullImage pullTask = project.tasks.create("pullBaseImage", DockerPullImage) {
repository = "stefanotestingcr.azurecr.io/buildbase"
tag = "latest"
doFirst {
@ -83,33 +81,41 @@ class ImageBuilding implements Plugin<Project> {
targetContainerId createBuildContainer.getContainerId()
}
DockerTagImage tagBuildImageResult = project.tasks.create('tagBuildImageResult', DockerTagImage) {
dependsOn commitBuildImageResult
imageId = commitBuildImageResult.getImageId()
tag = System.getProperty("docker.provided.tag") ? System.getProperty("docker.provided.tag") : "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
repository = "stefanotestingcr.azurecr.io/testing"
tag = System.getProperty("docker.provided.tag") ? System.getProperty("docker.provided.tag") : "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
repository = registryName
}
if (System.getProperty("docker.tag")) {
DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) {
doFirst {
registryCredentials = registryCredentialsForPush
}
imageName = "stefanotestingcr.azurecr.io/testing"
tag = System.getProperty("docker.tag")
DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) {
dependsOn tagBuildImageResult
doFirst {
registryCredentials = registryCredentialsForPush
}
this.pushTask = pushBuildImage
} else {
DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) {
dependsOn tagBuildImageResult
doFirst {
registryCredentials = registryCredentialsForPush
}
imageName = "stefanotestingcr.azurecr.io/testing"
tag = tagBuildImageResult.tag
}
this.pushTask = pushBuildImage
imageName = registryName
tag = tagBuildImageResult.tag
}
this.pushTask = pushBuildImage
DockerRemoveContainer deleteContainer = project.tasks.create('deleteBuildContainer', DockerRemoveContainer) {
dependsOn pushBuildImage
targetContainerId createBuildContainer.getContainerId()
}
DockerRemoveImage deleteTaggedImage = project.tasks.create('deleteTaggedImage', DockerRemoveImage) {
dependsOn pushBuildImage
force = true
targetImageId commitBuildImageResult.getImageId()
}
DockerRemoveImage deleteBuildImage = project.tasks.create('deleteBuildImage', DockerRemoveImage) {
dependsOn deleteContainer, deleteTaggedImage
force = true
targetImageId buildDockerImageForSource.getImageId()
}
if (System.getProperty("docker.keep.image") == null) {
pushBuildImage.finalizedBy(deleteContainer, deleteBuildImage, deleteTaggedImage)
}
}
}

View File

@ -16,7 +16,6 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.IntStream
@ -29,6 +28,8 @@ class KubesTest extends DefaultTask {
String fullTaskToExecutePath
String taskToExecuteName
Boolean printOutput = false
Integer numberOfCoresPerFork = 4
Integer memoryGbPerFork = 6
public volatile List<File> testOutput = Collections.emptyList()
public volatile List<KubePodResult> containerResults = Collections.emptyList()
@ -38,7 +39,6 @@ class KubesTest extends DefaultTask {
int numberOfPods = 20
int timeoutInMinutesForPodToStart = 60
@TaskAction
void runTestsOnKubes() {
@ -53,7 +53,7 @@ class KubesTest extends DefaultTask {
def currentUser = System.getProperty("user.name") ? System.getProperty("user.name") : "UNKNOWN_USER"
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode())).toString(36).toLowerCase()
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase()
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase()
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
@ -77,164 +77,179 @@ class KubesTest extends DefaultTask {
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
}
List<CompletableFuture<KubePodResult>> podCreationFutures = IntStream.range(0, numberOfPods).mapToObj({ i ->
CompletableFuture.supplyAsync({
File outputFile = Files.createTempFile("container", ".log").toFile()
String podName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase()
Pod podRequest = buildPod(podName)
project.logger.lifecycle("created pod: " + podName)
Pod createdPod = client.pods().inNamespace(namespace).create(podRequest)
Runtime.getRuntime().addShutdownHook({
println "Deleting pod: " + podName
client.pods().delete(createdPod)
})
CompletableFuture<Void> waiter = new CompletableFuture<Void>()
KubePodResult result = new KubePodResult(createdPod, waiter, outputFile)
startBuildAndLogging(client, namespace, numberOfPods, i, podName, printOutput, waiter, { int resultCode ->
println podName + " has completed with resultCode=$resultCode"
result.setResultCode(resultCode)
}, outputFile)
return result
}, executorService)
List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj({ i ->
String podName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase()
runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3)
}).collect(Collectors.toList())
def binaryFileFutures = podCreationFutures.collect { creationFuture ->
return creationFuture.thenComposeAsync({ podResult ->
return podResult.waiter.thenApply {
project.logger.lifecycle("Successfully terminated log streaming for " + podResult.createdPod.getMetadata().getName())
println "Gathering test results from ${podResult.createdPod.metadata.name}"
def binaryResults = downloadTestXmlFromPod(client, namespace, podResult.createdPod)
project.logger.lifecycle("deleting: " + podResult.createdPod.getMetadata().getName())
client.resource(podResult.createdPod).delete()
return binaryResults
}
}, singleThreadedExecutor)
}
def allFilesDownloadedFuture = CompletableFuture.allOf(*binaryFileFutures.toArray(new CompletableFuture[0])).thenApply {
def allBinaryFiles = binaryFileFutures.collect { future ->
Collection<File> binaryFiles = future.get()
return binaryFiles
}.flatten()
this.testOutput = Collections.synchronizedList(allBinaryFiles)
return allBinaryFiles
}
allFilesDownloadedFuture.get()
this.containerResults = podCreationFutures.collect { it -> it.get() }
this.testOutput = Collections.synchronizedList(futures.collect { it -> it.get().binaryResults }.flatten())
this.containerResults = futures.collect { it -> it.get() }
}
void startBuildAndLogging(KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
CompletableFuture<Void> waiter,
Consumer<Integer> resultSetter,
File outputFileForContainer) {
try {
project.logger.lifecycle("Waiting for pod " + podName + " to start before executing build")
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES)
project.logger.lifecycle("pod " + podName + " has started, executing build")
Watch eventWatch = client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
@Override
void eventReceived(Watcher.Action action, Pod resource) {
project.logger.lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name())
}
CompletableFuture<KubePodResult> runBuild(KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries) {
@Override
void onClose(KubernetesClientException cause) {
}
})
CompletableFuture<KubePodResult> toReturn = new CompletableFuture<KubePodResult>()
def stdOutOs = new PipedOutputStream()
def stdOutIs = new PipedInputStream(4096)
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
executorService.submit({
int tryCount = 0
Pod createdPod = null
while (tryCount < numberOfRetries) {
try {
Pod podRequest = buildPod(podName)
project.logger.lifecycle("requesting pod: " + podName)
createdPod = client.pods().inNamespace(namespace).create(podRequest)
project.logger.lifecycle("scheduled pod: " + podName)
File outputFile = Files.createTempFile("container", ".log").toFile()
attachStatusListenerToPod(client, namespace, podName)
schedulePodForDeleteOnShutdown(podName, client, createdPod)
waitForPodToStart(podName, client, namespace)
def stdOutOs = new PipedOutputStream()
def stdOutIs = new PipedInputStream(4096)
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
KubePodResult result = new KubePodResult(createdPod, null, outputFile)
CompletableFuture<KubePodResult> waiter = new CompletableFuture<>()
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result)
stdOutIs.connect(stdOutOs)
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(execListener).exec(getBuildCommand(numberOfPods, podIdx))
def terminatingListener = new ExecListener() {
@Override
void onOpen(Response response) {
project.logger.lifecycle("Build started on pod " + podName)
}
@Override
void onFailure(Throwable t, Response response) {
project.logger.lifecycle("Received error from rom pod " + podName)
waiter.completeExceptionally(t)
}
@Override
void onClose(int code, String reason) {
project.logger.lifecycle("Received onClose() from pod " + podName + " with returnCode=" + code)
startLogPumping(outputFile, stdOutIs, podIdx, printOutput)
KubePodResult execResult = waiter.join()
project.logger.lifecycle("build has ended on on pod ${podName} (${podIdx}/${numberOfPods})")
project.logger.lifecycle "Gathering test results from ${execResult.createdPod.metadata.name}"
def binaryResults = downloadTestXmlFromPod(client, namespace, execResult.createdPod)
project.logger.lifecycle("deleting: " + execResult.createdPod.getMetadata().getName())
client.resource(execResult.createdPod).delete()
result.binaryResults = binaryResults
toReturn.complete(result)
break
} catch (Exception e) {
logger.error("Encountered error during testing cycle on pod ${podName} (${podIdx}/${numberOfPods})", e)
try {
def errChannelContents = errChannelStream.toString()
println errChannelContents
Status status = Serialization.unmarshal(errChannelContents, Status.class);
resultSetter.accept(status.details?.causes?.first()?.message?.toInteger() ? status.details?.causes?.first()?.message?.toInteger() : 0)
waiter.complete()
} catch (Exception e) {
waiter.completeExceptionally(e)
if (createdPod) {
client.pods().delete(createdPod)
while (client.pods().inNamespace(namespace).list().getItems().find { p -> p.metadata.name == podName }) {
logger.warn("pod ${podName} has not been deleted, waiting 1s")
Thread.sleep(1000)
}
}
} catch (Exception ignored) {
}
tryCount++
logger.lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times")
}
}
if (tryCount >= numberOfRetries) {
toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit"))
}
})
return toReturn
}
stdOutIs.connect(stdOutOs)
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(terminatingListener).exec(getBuildCommand(numberOfPods, podIdx))
project.logger.lifecycle("Pod: " + podName + " has started ")
Thread loggingThread = new Thread({ ->
BufferedWriter out = null
BufferedReader br = null
try {
out = new BufferedWriter(new FileWriter(outputFileForContainer))
br = new BufferedReader(new InputStreamReader(stdOutIs))
String line
while ((line = br.readLine()) != null) {
def toWrite = ("${taskToExecuteName}/Container" + podIdx + ": " + line).trim()
if (printOutput) {
project.logger.lifecycle(toWrite)
}
out.println(toWrite)
void startLogPumping(File outputFile, stdOutIs, podIdx, boolean printOutput) {
Thread loggingThread = new Thread({ ->
BufferedWriter out = null
BufferedReader br = null
try {
out = new BufferedWriter(new FileWriter(outputFile))
br = new BufferedReader(new InputStreamReader(stdOutIs))
String line
while ((line = br.readLine()) != null) {
def toWrite = ("${taskToExecuteName}/Container" + podIdx + ": " + line).trim()
if (printOutput) {
project.logger.lifecycle(toWrite)
}
} catch (IOException ignored) {
out.println(toWrite)
}
finally {
out?.close()
br?.close()
}
})
} catch (IOException ignored) {
}
finally {
out?.close()
br?.close()
}
})
loggingThread.setDaemon(true)
loggingThread.start()
} catch (InterruptedException ignored) {
throw new GradleException("Could not get slot on cluster within timeout")
loggingThread.setDaemon(true)
loggingThread.start()
}
ExecListener buildExecListenerForPod(podName, errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) {
new ExecListener() {
@Override
void onOpen(Response response) {
project.logger.lifecycle("Build started on pod " + podName)
}
@Override
void onFailure(Throwable t, Response response) {
project.logger.lifecycle("Received error from rom pod " + podName)
waitingFuture.completeExceptionally(t)
}
@Override
void onClose(int code, String reason) {
project.logger.lifecycle("Received onClose() from pod " + podName + " with returnCode=" + code)
try {
def errChannelContents = errChannelStream.toString()
Status status = Serialization.unmarshal(errChannelContents, Status.class);
result.resultCode = status.details?.causes?.first()?.message?.toInteger() ? status.details?.causes?.first()?.message?.toInteger() : 0
waitingFuture.complete(result)
} catch (Exception e) {
waitingFuture.completeExceptionally(e)
}
}
}
}
void schedulePodForDeleteOnShutdown(String podName, client, Pod createdPod) {
project.logger.info("attaching shutdown hook for pod ${podName}")
Runtime.getRuntime().addShutdownHook({
println "Deleting pod: " + podName
client.pods().delete(createdPod)
})
}
Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) {
client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
@Override
void eventReceived(Watcher.Action action, Pod resource) {
project.logger.lifecycle("[StatusChange] pod ${resource.getMetadata().getName()} ${action.name()} (${resource.status.phase})")
}
@Override
void onClose(KubernetesClientException cause) {
}
})
}
void waitForPodToStart(String podName, KubernetesClient client, String namespace) {
project.logger.lifecycle("Waiting for pod " + podName + " to start before executing build")
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES)
project.logger.lifecycle("pod " + podName + " has started, executing build")
}
Pod buildPod(String podName) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewVolume()
.withName("gradlecache")
.withNewHostPath()
.withPath("/gradle")
.withPath("/tmp/gradle")
.withType("DirectoryOrCreate")
.endHostPath()
.endVolume()
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
//max container life time is 30min
.withArgs("-c", "sleep 1800")
.withArgs("-c", "sleep 3600")
.addNewEnv()
.withName("DRIVER_NODE_MEMORY")
.withValue("1024m")
@ -243,8 +258,8 @@ class KubesTest extends DefaultTask {
.endEnv()
.withName(podName)
.withNewResources()
.addToRequests("cpu", new Quantity("2"))
.addToRequests("memory", new Quantity("6Gi"))
.addToRequests("cpu", new Quantity("${numberOfCoresPerFork}"))
.addToRequests("memory", new Quantity("${memoryGbPerFork}Gi"))
.endResources()
.addNewVolumeMount()
.withName("gradlecache")
@ -276,7 +291,7 @@ class KubesTest extends DefaultTask {
tempDir.toFile().mkdirs()
}
project.logger.lifecycle("saving to " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath())
project.logger.lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath())
client.pods()
.inNamespace(namespace)
.withName(podName)

View File

@ -0,0 +1,41 @@
package net.corda.testing
import org.gradle.api.DefaultTask
import org.gradle.api.tasks.TaskAction
class ParallelTestGroup extends DefaultTask {
List<String> groups = new ArrayList<>()
int shardCount = 20
int coresToUse = 4
int gbOfMemory = 4
boolean printToStdOut = true
void numberOfShards(int shards){
this.shardCount = shards
}
void coresPerFork(int cores){
this.coresToUse = cores
}
void memoryInGbPerFork(int gb){
this.gbOfMemory = gb
}
//when this is false, only containers will "failed" exit codes will be printed to stdout
void streamOutput(boolean print){
this.printToStdOut = print
}
void testGroups(String... group) {
testGroups(group.toList())
}
void testGroups(List<String> group) {
group.forEach {
groups.add(it)
}
}
}

View File

@ -3,6 +3,8 @@ package net.corda.testing;
import io.fabric8.kubernetes.api.model.Pod;
import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class KubePodResult {
@ -11,6 +13,7 @@ public class KubePodResult {
private final CompletableFuture<Void> waiter;
private volatile Integer resultCode = 255;
private final File output;
private volatile Collection<File> binaryResults = Collections.emptyList();
KubePodResult(Pod createdPod, CompletableFuture<Void> waiter, File output) {
this.createdPod = createdPod;

View File

@ -16,6 +16,7 @@
package net.corda.testing;
import org.apache.commons.compress.utils.IOUtils;
import org.gradle.api.DefaultTask;
import org.gradle.api.GradleException;
import org.gradle.api.Transformer;
@ -33,6 +34,8 @@ import org.gradle.internal.operations.BuildOperationExecutor;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -49,6 +52,7 @@ public class KubesReporting extends DefaultTask {
private File destinationDir = new File(getProject().getBuildDir(), "test-reporting");
private List<Object> results = new ArrayList<Object>();
List<KubePodResult> podResults = new ArrayList<>();
boolean shouldPrintOutput = true;
public KubesReporting() {
//force this task to always run, as it's responsible for parsing exit codes
@ -147,12 +151,17 @@ public class KubesReporting extends DefaultTask {
if (!containersWithNonZeroReturnCodes.isEmpty()) {
String reportUrl = new ConsoleRenderer().asClickableFileUrl(new File(destinationDir, "index.html"));
String containerOutputs = containersWithNonZeroReturnCodes.stream().map(KubePodResult::getOutput).map(file -> new ConsoleRenderer().asClickableFileUrl(file)).reduce("",
(s, s2) -> s + "\n" + s2
);
String message = "remote build failed, check test report at " + reportUrl + "\n and container outputs at " + containerOutputs;
if (shouldPrintOutput){
containersWithNonZeroReturnCodes.forEach(container -> {
try {
System.out.println("\n##### CONTAINER OUTPUT START #####");
IOUtils.copy(new FileInputStream(container.getOutput()), System.out);
System.out.println("##### CONTAINER OUTPUT END #####\n");
} catch (IOException ignored) {
}
});
}
String message = "remote build failed, check test report at " + reportUrl;
throw new GradleException(message);
}
} else {