diff --git a/Jenkinsfile b/Jenkinsfile index 49b3ca6686..c6155604fd 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -2,6 +2,8 @@ killall_jobs() pipeline { agent { label 'k8s' } + options { timestamps() } + environment { DOCKER_TAG_TO_USE = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}" EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}" @@ -9,7 +11,7 @@ pipeline { } stages { - stage('Corda Pull Request Integration Tests - Generate Build Image') { + stage('Corda Pull Request - Generate Build Image') { steps { withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) { sh "./gradlew " + @@ -19,26 +21,42 @@ pipeline { "-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" + " clean pushBuildImage" } - } - } - stage('Corda Pull Request Integration Tests - Run Integration Tests') { - steps { - withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) { - sh "./gradlew " + - "-DbuildId=\"\${BUILD_ID}\" " + - "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + - "-Dkubenetize=true " + - "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " allParallelIntegrationTest" - } - junit '**/build/test-results-xml/**/*.xml' + sh "kubectl auth can-i get pods" } } - stage('Clear testing images') { - steps { - sh """docker rmi -f \$(docker images | grep \${DOCKER_TAG_TO_USE} | awk '{print \$3}') || echo \"there were no images to delete\"""" + stage('Corda Pull Request - Run Tests') { + parallel { + stage('Integration Tests') { + steps { + sh "./gradlew " + + "-DbuildId=\"\${BUILD_ID}\" " + + "-Dkubenetize=true " + + "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + + " allParallelIntegrationTest" + } + post { + always { + junit '**/build/test-results-xml/**/*.xml' + } + } + } +// stage('Unit Tests') { +// steps { +// sh "./gradlew " + +// "-DbuildId=\"\${BUILD_ID}\" " + +// "-Dkubenetize=true " + +// "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + +// " allParallelUnitTest" +// } +// post { +// always { +// junit '**/build/test-results-xml/**/*.xml' +// } +// } +// } } + } } } diff --git a/build.gradle b/build.gradle index 5e3eba799d..4f3b01b2aa 100644 --- a/build.gradle +++ b/build.gradle @@ -1,4 +1,5 @@ import net.corda.testing.DistributedTesting +import net.corda.testing.ParallelTestGroup import static org.gradle.api.JavaVersion.VERSION_1_8 import static org.gradle.api.JavaVersion.VERSION_11 @@ -365,6 +366,10 @@ allprojects { if (!JavaVersion.current().java8Compatible) throw new GradleException("Corda requires Java 8, please upgrade to at least 1.8.0_$java8_minUpdateVersion") +configurations { + detekt +} + // Required for building out the fat JAR. dependencies { compile project(':node') @@ -384,6 +389,7 @@ dependencies { runtime project(':finance:contracts') runtime project(':webserver') testCompile project(':test-utils') + detekt 'io.gitlab.arturbosch.detekt:detekt-cli:1.0.1' } jar { @@ -412,6 +418,26 @@ task jacocoRootReport(type: org.gradle.testing.jacoco.tasks.JacocoReport) { } } +task detekt(type: JavaExec) { + main = "io.gitlab.arturbosch.detekt.cli.Main" + classpath = configurations.detekt + def input = "$projectDir" + def config = "$projectDir/detekt-config.yml" + def baseline = "$projectDir/detekt-baseline.xml" + def params = ['-i', input, '-c', config, '-b', baseline] + args(params) +} + +task detektBaseline(type: JavaExec) { + main = "io.gitlab.arturbosch.detekt.cli.Main" + classpath = configurations.detekt + def input = "$projectDir" + def config = "$projectDir/detekt-config.yml" + def baseline = "$projectDir/detekt-baseline.xml" + def params = ['-i', input, '-c', config, '-b', baseline, '--create-baseline'] + args(params) +} + tasks.withType(Test) { reports.html.destination = file("${reporting.baseDir}/${name}") } @@ -553,32 +579,27 @@ buildScan { termsOfServiceAgree = 'yes' } +task allParallelIntegrationTest(type: ParallelTestGroup) { + testGroups "integrationTest" + numberOfShards 15 + streamOutput false + coresPerFork 6 + memoryInGbPerFork 10 +} +task allParallelUnitTest(type: ParallelTestGroup) { + testGroups "test" + numberOfShards 15 + streamOutput false + coresPerFork 3 + memoryInGbPerFork 6 +} +task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) { + testGroups "test", "integrationTest" + numberOfShards 20 + streamOutput false + coresPerFork 6 + memoryInGbPerFork 10 +} apply plugin: DistributedTesting -configurations { - detekt -} -dependencies { - detekt 'io.gitlab.arturbosch.detekt:detekt-cli:1.0.1' -} - -task detekt(type: JavaExec) { - main = "io.gitlab.arturbosch.detekt.cli.Main" - classpath = configurations.detekt - def input = "$projectDir" - def config = "$projectDir/detekt-config.yml" - def baseline = "$projectDir/detekt-baseline.xml" - def params = ['-i', input, '-c', config, '-b', baseline] - args(params) -} - -task detektBaseline(type: JavaExec) { - main = "io.gitlab.arturbosch.detekt.cli.Main" - classpath = configurations.detekt - def input = "$projectDir" - def config = "$projectDir/detekt-config.yml" - def baseline = "$projectDir/detekt-baseline.xml" - def params = ['-i', input, '-c', config, '-b', baseline, '--create-baseline'] - args(params) -} diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 8c21690366..58443e65e1 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -35,6 +35,7 @@ dependencies { compile gradleApi() compile "io.fabric8:kubernetes-client:4.4.1" compile 'org.apache.commons:commons-compress:1.19' + compile 'org.apache.commons:commons-lang3:3.9' compile 'commons-codec:commons-codec:1.13' compile "io.github.classgraph:classgraph:$class_graph_version" compile "com.bmuschko:gradle-docker-plugin:5.0.0" diff --git a/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy b/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy index 5965908cd2..dff995e2bd 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy @@ -1,10 +1,8 @@ package net.corda.testing - import com.bmuschko.gradle.docker.tasks.image.DockerPushImage import org.gradle.api.Plugin import org.gradle.api.Project -import org.gradle.api.Task import org.gradle.api.tasks.testing.Test /** @@ -22,6 +20,7 @@ class DistributedTesting implements Plugin<Project> { ensureImagePluginIsApplied(project) ImageBuilding imagePlugin = project.plugins.getPlugin(ImageBuilding) DockerPushImage imageBuildingTask = imagePlugin.pushTask + String providedTag = System.getProperty("docker.tag") //in each subproject //1. add the task to determine all tests within the module @@ -31,7 +30,7 @@ class DistributedTesting implements Plugin<Project> { subProject.tasks.withType(Test) { Test task -> ListTests testListerTask = createTestListingTasks(task, subProject) Test modifiedTestTask = modifyTestTaskForParallelExecution(subProject, task, testListerTask) - KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask) + KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask, providedTag) } } @@ -45,55 +44,57 @@ class DistributedTesting implements Plugin<Project> { //first step is to create a single task which will invoke all the submodule tasks for each grouping //ie allParallelTest will invoke [node:test, core:test, client:rpc:test ... etc] //ie allIntegrationTest will invoke [node:integrationTest, core:integrationTest, client:rpc:integrationTest ... etc] - createGroupedParallelTestTasks(allKubesTestingTasksGroupedByType, project, imageBuildingTask) + Set<ParallelTestGroup> userGroups = new HashSet<>(project.tasks.withType(ParallelTestGroup)) + + Collection<ParallelTestGroup> userDefinedGroups = userGroups.forEach { testGrouping -> + List<KubesTest> groups = ((ParallelTestGroup) testGrouping).groups.collect { + allKubesTestingTasksGroupedByType.get(it) + }.flatten() + String superListOfTasks = groups.collect { it.fullTaskToExecutePath }.join(" ") + + def userDefinedParallelTask = project.rootProject.tasks.create("userDefined" + testGrouping.name.capitalize(), KubesTest) { + if (!providedTag) { + dependsOn imageBuildingTask + } + numberOfPods = testGrouping.getShardCount() + printOutput = testGrouping.printToStdOut + fullTaskToExecutePath = superListOfTasks + taskToExecuteName = testGrouping.groups.join("And") + memoryGbPerFork = testGrouping.gbOfMemory + numberOfCoresPerFork = testGrouping.coresToUse + doFirst { + dockerTag = dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get()) + } + } + def reportOnAllTask = project.rootProject.tasks.create("userDefinedReports${testGrouping.name.capitalize()}", KubesReporting) { + dependsOn userDefinedParallelTask + destinationDir new File(project.rootProject.getBuildDir(), "userDefinedReports${testGrouping.name.capitalize()}") + doFirst { + destinationDir.deleteDir() + shouldPrintOutput = !testGrouping.printToStdOut + podResults = userDefinedParallelTask.containerResults + reportOn(userDefinedParallelTask.testOutput) + } + } + userDefinedParallelTask.finalizedBy(reportOnAllTask) + testGrouping.dependsOn(userDefinedParallelTask) + } } } - private List<Task> createGroupedParallelTestTasks(Map<String, List<KubesTest>> allKubesTestingTasksGroupedByType, Project project, DockerPushImage imageBuildingTask) { - allKubesTestingTasksGroupedByType.entrySet().collect { entry -> - def taskType = entry.key - def allTasksOfType = entry.value - def allParallelTask = project.rootProject.tasks.create("allParallel" + taskType.capitalize(), KubesTest) { - dependsOn imageBuildingTask - printOutput = true - fullTaskToExecutePath = allTasksOfType.collect { task -> task.fullTaskToExecutePath }.join(" ") - taskToExecuteName = taskType - doFirst { - dockerTag = imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get() - } - } - - //second step is to create a task to use the reports output by the parallel test task - def reportOnAllTask = project.rootProject.tasks.create("reportAllParallel${taskType.capitalize()}", KubesReporting) { - dependsOn allParallelTask - destinationDir new File(project.rootProject.getBuildDir(), "allResults${taskType.capitalize()}") - doFirst { - destinationDir.deleteDir() - podResults = allParallelTask.containerResults - reportOn(allParallelTask.testOutput) - } - } - - //invoke this report task after parallel testing - allParallelTask.finalizedBy(reportOnAllTask) - project.logger.info "Created task: ${allParallelTask.getPath()} to enable testing on kubenetes for tasks: ${allParallelTask.fullTaskToExecutePath}" - project.logger.info "Created task: ${reportOnAllTask.getPath()} to generate test html output for task ${allParallelTask.getPath()}" - return allParallelTask - - } - } - - private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask) { + private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask, String providedTag) { def taskName = task.getName() def capitalizedTaskName = task.getName().capitalize() KubesTest createdParallelTestTask = projectContainingTask.tasks.create("parallel" + capitalizedTaskName, KubesTest) { - dependsOn imageBuildingTask + if (!providedTag) { + dependsOn imageBuildingTask + } printOutput = true fullTaskToExecutePath = task.getPath() taskToExecuteName = taskName doFirst { - dockerTag = imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get() + dockerTag = providedTag ? ImageBuilding.registryName + ":" + providedTag : (imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get()) } } projectContainingTask.logger.info "Created task: ${createdParallelTestTask.getPath()} to enable testing on kubenetes for task: ${task.getPath()}" diff --git a/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.groovy b/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.groovy index 54f6e91f74..dbbe22a5ba 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/ImageBuilding.groovy @@ -1,10 +1,7 @@ package net.corda.testing import com.bmuschko.gradle.docker.DockerRegistryCredentials -import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer -import com.bmuschko.gradle.docker.tasks.container.DockerLogsContainer -import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer -import com.bmuschko.gradle.docker.tasks.container.DockerWaitContainer +import com.bmuschko.gradle.docker.tasks.container.* import com.bmuschko.gradle.docker.tasks.image.* import org.gradle.api.GradleException import org.gradle.api.Plugin @@ -16,6 +13,7 @@ import org.gradle.api.Project */ class ImageBuilding implements Plugin<Project> { + public static final String registryName = "stefanotestingcr.azurecr.io/testing" DockerPushImage pushTask @Override @@ -25,7 +23,7 @@ class ImageBuilding implements Plugin<Project> { registryCredentialsForPush.username.set("stefanotestingcr") registryCredentialsForPush.password.set(System.getProperty("docker.push.password") ? System.getProperty("docker.push.password") : "") - DockerPullImage pullTask = project.tasks.create("pullBaseImage", DockerPullImage){ + DockerPullImage pullTask = project.tasks.create("pullBaseImage", DockerPullImage) { repository = "stefanotestingcr.azurecr.io/buildbase" tag = "latest" doFirst { @@ -83,33 +81,41 @@ class ImageBuilding implements Plugin<Project> { targetContainerId createBuildContainer.getContainerId() } + DockerTagImage tagBuildImageResult = project.tasks.create('tagBuildImageResult', DockerTagImage) { dependsOn commitBuildImageResult imageId = commitBuildImageResult.getImageId() - tag = System.getProperty("docker.provided.tag") ? System.getProperty("docker.provided.tag") : "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}" - repository = "stefanotestingcr.azurecr.io/testing" + tag = System.getProperty("docker.provided.tag") ? System.getProperty("docker.provided.tag") : "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}" + repository = registryName } - if (System.getProperty("docker.tag")) { - DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) { - doFirst { - registryCredentials = registryCredentialsForPush - } - imageName = "stefanotestingcr.azurecr.io/testing" - tag = System.getProperty("docker.tag") + DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) { + dependsOn tagBuildImageResult + doFirst { + registryCredentials = registryCredentialsForPush } - this.pushTask = pushBuildImage - } else { - DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) { - dependsOn tagBuildImageResult - doFirst { - registryCredentials = registryCredentialsForPush - } - imageName = "stefanotestingcr.azurecr.io/testing" - tag = tagBuildImageResult.tag - } - this.pushTask = pushBuildImage + imageName = registryName + tag = tagBuildImageResult.tag } + this.pushTask = pushBuildImage + + DockerRemoveContainer deleteContainer = project.tasks.create('deleteBuildContainer', DockerRemoveContainer) { + dependsOn pushBuildImage + targetContainerId createBuildContainer.getContainerId() + } + DockerRemoveImage deleteTaggedImage = project.tasks.create('deleteTaggedImage', DockerRemoveImage) { + dependsOn pushBuildImage + force = true + targetImageId commitBuildImageResult.getImageId() + } + DockerRemoveImage deleteBuildImage = project.tasks.create('deleteBuildImage', DockerRemoveImage) { + dependsOn deleteContainer, deleteTaggedImage + force = true + targetImageId buildDockerImageForSource.getImageId() + } + if (System.getProperty("docker.keep.image") == null) { + pushBuildImage.finalizedBy(deleteContainer, deleteBuildImage, deleteTaggedImage) + } } } \ No newline at end of file diff --git a/buildSrc/src/main/groovy/net/corda/testing/KubesTest.groovy b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.groovy index b9a4896597..7ed6bb15f0 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/KubesTest.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.groovy @@ -16,7 +16,6 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit -import java.util.function.Consumer import java.util.stream.Collectors import java.util.stream.IntStream @@ -29,6 +28,8 @@ class KubesTest extends DefaultTask { String fullTaskToExecutePath String taskToExecuteName Boolean printOutput = false + Integer numberOfCoresPerFork = 4 + Integer memoryGbPerFork = 6 public volatile List<File> testOutput = Collections.emptyList() public volatile List<KubePodResult> containerResults = Collections.emptyList() @@ -38,7 +39,6 @@ class KubesTest extends DefaultTask { int numberOfPods = 20 int timeoutInMinutesForPodToStart = 60 - @TaskAction void runTestsOnKubes() { @@ -53,7 +53,7 @@ class KubesTest extends DefaultTask { def currentUser = System.getProperty("user.name") ? System.getProperty("user.name") : "UNKNOWN_USER" - String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode())).toString(36).toLowerCase() + String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase() String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase() io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder() @@ -77,164 +77,179 @@ class KubesTest extends DefaultTask { //it's possible that a pod is being deleted by the original build, this can lead to racey conditions } - - List<CompletableFuture<KubePodResult>> podCreationFutures = IntStream.range(0, numberOfPods).mapToObj({ i -> - CompletableFuture.supplyAsync({ - File outputFile = Files.createTempFile("container", ".log").toFile() - String podName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase() - Pod podRequest = buildPod(podName) - project.logger.lifecycle("created pod: " + podName) - Pod createdPod = client.pods().inNamespace(namespace).create(podRequest) - Runtime.getRuntime().addShutdownHook({ - println "Deleting pod: " + podName - client.pods().delete(createdPod) - }) - CompletableFuture<Void> waiter = new CompletableFuture<Void>() - KubePodResult result = new KubePodResult(createdPod, waiter, outputFile) - startBuildAndLogging(client, namespace, numberOfPods, i, podName, printOutput, waiter, { int resultCode -> - println podName + " has completed with resultCode=$resultCode" - result.setResultCode(resultCode) - }, outputFile) - - return result - }, executorService) + List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj({ i -> + String podName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase() + runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3) }).collect(Collectors.toList()) - - def binaryFileFutures = podCreationFutures.collect { creationFuture -> - return creationFuture.thenComposeAsync({ podResult -> - return podResult.waiter.thenApply { - project.logger.lifecycle("Successfully terminated log streaming for " + podResult.createdPod.getMetadata().getName()) - println "Gathering test results from ${podResult.createdPod.metadata.name}" - def binaryResults = downloadTestXmlFromPod(client, namespace, podResult.createdPod) - project.logger.lifecycle("deleting: " + podResult.createdPod.getMetadata().getName()) - client.resource(podResult.createdPod).delete() - return binaryResults - } - }, singleThreadedExecutor) - } - - def allFilesDownloadedFuture = CompletableFuture.allOf(*binaryFileFutures.toArray(new CompletableFuture[0])).thenApply { - def allBinaryFiles = binaryFileFutures.collect { future -> - Collection<File> binaryFiles = future.get() - return binaryFiles - }.flatten() - this.testOutput = Collections.synchronizedList(allBinaryFiles) - return allBinaryFiles - } - - allFilesDownloadedFuture.get() - this.containerResults = podCreationFutures.collect { it -> it.get() } + this.testOutput = Collections.synchronizedList(futures.collect { it -> it.get().binaryResults }.flatten()) + this.containerResults = futures.collect { it -> it.get() } } - void startBuildAndLogging(KubernetesClient client, - String namespace, - int numberOfPods, - int podIdx, - String podName, - boolean printOutput, - CompletableFuture<Void> waiter, - Consumer<Integer> resultSetter, - File outputFileForContainer) { - try { - project.logger.lifecycle("Waiting for pod " + podName + " to start before executing build") - client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES) - project.logger.lifecycle("pod " + podName + " has started, executing build") - Watch eventWatch = client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() { - @Override - void eventReceived(Watcher.Action action, Pod resource) { - project.logger.lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name()) - } + CompletableFuture<KubePodResult> runBuild(KubernetesClient client, + String namespace, + int numberOfPods, + int podIdx, + String podName, + boolean printOutput, + int numberOfRetries) { - @Override - void onClose(KubernetesClientException cause) { - } - }) + CompletableFuture<KubePodResult> toReturn = new CompletableFuture<KubePodResult>() - def stdOutOs = new PipedOutputStream() - def stdOutIs = new PipedInputStream(4096) - ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream(); + executorService.submit({ + int tryCount = 0 + Pod createdPod = null + while (tryCount < numberOfRetries) { + try { + Pod podRequest = buildPod(podName) + project.logger.lifecycle("requesting pod: " + podName) + createdPod = client.pods().inNamespace(namespace).create(podRequest) + project.logger.lifecycle("scheduled pod: " + podName) + File outputFile = Files.createTempFile("container", ".log").toFile() + attachStatusListenerToPod(client, namespace, podName) + schedulePodForDeleteOnShutdown(podName, client, createdPod) + waitForPodToStart(podName, client, namespace) + def stdOutOs = new PipedOutputStream() + def stdOutIs = new PipedInputStream(4096) + ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream(); + KubePodResult result = new KubePodResult(createdPod, null, outputFile) + CompletableFuture<KubePodResult> waiter = new CompletableFuture<>() + ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result) + stdOutIs.connect(stdOutOs) + ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName) + .writingOutput(stdOutOs) + .writingErrorChannel(errChannelStream) + .usingListener(execListener).exec(getBuildCommand(numberOfPods, podIdx)) - def terminatingListener = new ExecListener() { - - @Override - void onOpen(Response response) { - project.logger.lifecycle("Build started on pod " + podName) - } - - @Override - void onFailure(Throwable t, Response response) { - project.logger.lifecycle("Received error from rom pod " + podName) - waiter.completeExceptionally(t) - } - - @Override - void onClose(int code, String reason) { - project.logger.lifecycle("Received onClose() from pod " + podName + " with returnCode=" + code) + startLogPumping(outputFile, stdOutIs, podIdx, printOutput) + KubePodResult execResult = waiter.join() + project.logger.lifecycle("build has ended on on pod ${podName} (${podIdx}/${numberOfPods})") + project.logger.lifecycle "Gathering test results from ${execResult.createdPod.metadata.name}" + def binaryResults = downloadTestXmlFromPod(client, namespace, execResult.createdPod) + project.logger.lifecycle("deleting: " + execResult.createdPod.getMetadata().getName()) + client.resource(execResult.createdPod).delete() + result.binaryResults = binaryResults + toReturn.complete(result) + break + } catch (Exception e) { + logger.error("Encountered error during testing cycle on pod ${podName} (${podIdx}/${numberOfPods})", e) try { - def errChannelContents = errChannelStream.toString() - println errChannelContents - Status status = Serialization.unmarshal(errChannelContents, Status.class); - resultSetter.accept(status.details?.causes?.first()?.message?.toInteger() ? status.details?.causes?.first()?.message?.toInteger() : 0) - waiter.complete() - } catch (Exception e) { - waiter.completeExceptionally(e) + if (createdPod) { + client.pods().delete(createdPod) + while (client.pods().inNamespace(namespace).list().getItems().find { p -> p.metadata.name == podName }) { + logger.warn("pod ${podName} has not been deleted, waiting 1s") + Thread.sleep(1000) + } + } + } catch (Exception ignored) { } + tryCount++ + logger.lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times") } } + if (tryCount >= numberOfRetries) { + toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit")) + } + }) + return toReturn + } - stdOutIs.connect(stdOutOs) - - ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName) - .writingOutput(stdOutOs) - .writingErrorChannel(errChannelStream) - .usingListener(terminatingListener).exec(getBuildCommand(numberOfPods, podIdx)) - - project.logger.lifecycle("Pod: " + podName + " has started ") - - Thread loggingThread = new Thread({ -> - BufferedWriter out = null - BufferedReader br = null - try { - out = new BufferedWriter(new FileWriter(outputFileForContainer)) - br = new BufferedReader(new InputStreamReader(stdOutIs)) - String line - while ((line = br.readLine()) != null) { - def toWrite = ("${taskToExecuteName}/Container" + podIdx + ": " + line).trim() - if (printOutput) { - project.logger.lifecycle(toWrite) - } - out.println(toWrite) + void startLogPumping(File outputFile, stdOutIs, podIdx, boolean printOutput) { + Thread loggingThread = new Thread({ -> + BufferedWriter out = null + BufferedReader br = null + try { + out = new BufferedWriter(new FileWriter(outputFile)) + br = new BufferedReader(new InputStreamReader(stdOutIs)) + String line + while ((line = br.readLine()) != null) { + def toWrite = ("${taskToExecuteName}/Container" + podIdx + ": " + line).trim() + if (printOutput) { + project.logger.lifecycle(toWrite) } - } catch (IOException ignored) { + out.println(toWrite) } - finally { - out?.close() - br?.close() - } - }) + } catch (IOException ignored) { + } + finally { + out?.close() + br?.close() + } + }) - loggingThread.setDaemon(true) - loggingThread.start() - } catch (InterruptedException ignored) { - throw new GradleException("Could not get slot on cluster within timeout") + loggingThread.setDaemon(true) + loggingThread.start() + } + + ExecListener buildExecListenerForPod(podName, errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) { + + new ExecListener() { + @Override + void onOpen(Response response) { + project.logger.lifecycle("Build started on pod " + podName) + } + + @Override + void onFailure(Throwable t, Response response) { + project.logger.lifecycle("Received error from rom pod " + podName) + waitingFuture.completeExceptionally(t) + } + + @Override + void onClose(int code, String reason) { + project.logger.lifecycle("Received onClose() from pod " + podName + " with returnCode=" + code) + try { + def errChannelContents = errChannelStream.toString() + Status status = Serialization.unmarshal(errChannelContents, Status.class); + result.resultCode = status.details?.causes?.first()?.message?.toInteger() ? status.details?.causes?.first()?.message?.toInteger() : 0 + waitingFuture.complete(result) + } catch (Exception e) { + waitingFuture.completeExceptionally(e) + } + } } } + void schedulePodForDeleteOnShutdown(String podName, client, Pod createdPod) { + project.logger.info("attaching shutdown hook for pod ${podName}") + Runtime.getRuntime().addShutdownHook({ + println "Deleting pod: " + podName + client.pods().delete(createdPod) + }) + } + + Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) { + client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() { + @Override + void eventReceived(Watcher.Action action, Pod resource) { + project.logger.lifecycle("[StatusChange] pod ${resource.getMetadata().getName()} ${action.name()} (${resource.status.phase})") + } + + @Override + void onClose(KubernetesClientException cause) { + } + }) + } + + void waitForPodToStart(String podName, KubernetesClient client, String namespace) { + project.logger.lifecycle("Waiting for pod " + podName + " to start before executing build") + client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES) + project.logger.lifecycle("pod " + podName + " has started, executing build") + } + Pod buildPod(String podName) { return new PodBuilder().withNewMetadata().withName(podName).endMetadata() .withNewSpec() .addNewVolume() .withName("gradlecache") .withNewHostPath() - .withPath("/gradle") + .withPath("/tmp/gradle") .withType("DirectoryOrCreate") .endHostPath() .endVolume() .addNewContainer() .withImage(dockerTag) .withCommand("bash") - //max container life time is 30min - .withArgs("-c", "sleep 1800") + .withArgs("-c", "sleep 3600") .addNewEnv() .withName("DRIVER_NODE_MEMORY") .withValue("1024m") @@ -243,8 +258,8 @@ class KubesTest extends DefaultTask { .endEnv() .withName(podName) .withNewResources() - .addToRequests("cpu", new Quantity("2")) - .addToRequests("memory", new Quantity("6Gi")) + .addToRequests("cpu", new Quantity("${numberOfCoresPerFork}")) + .addToRequests("memory", new Quantity("${memoryGbPerFork}Gi")) .endResources() .addNewVolumeMount() .withName("gradlecache") @@ -276,7 +291,7 @@ class KubesTest extends DefaultTask { tempDir.toFile().mkdirs() } - project.logger.lifecycle("saving to " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath()) + project.logger.lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath()) client.pods() .inNamespace(namespace) .withName(podName) diff --git a/buildSrc/src/main/groovy/net/corda/testing/ParallelTestGroup.groovy b/buildSrc/src/main/groovy/net/corda/testing/ParallelTestGroup.groovy new file mode 100644 index 0000000000..3d4edad7df --- /dev/null +++ b/buildSrc/src/main/groovy/net/corda/testing/ParallelTestGroup.groovy @@ -0,0 +1,41 @@ +package net.corda.testing + +import org.gradle.api.DefaultTask +import org.gradle.api.tasks.TaskAction + +class ParallelTestGroup extends DefaultTask { + + List<String> groups = new ArrayList<>() + int shardCount = 20 + int coresToUse = 4 + int gbOfMemory = 4 + boolean printToStdOut = true + + void numberOfShards(int shards){ + this.shardCount = shards + } + + void coresPerFork(int cores){ + this.coresToUse = cores + } + + void memoryInGbPerFork(int gb){ + this.gbOfMemory = gb + } + + //when this is false, only containers will "failed" exit codes will be printed to stdout + void streamOutput(boolean print){ + this.printToStdOut = print + } + + void testGroups(String... group) { + testGroups(group.toList()) + } + + void testGroups(List<String> group) { + group.forEach { + groups.add(it) + } + } + +} diff --git a/buildSrc/src/main/java/net/corda/testing/KubePodResult.java b/buildSrc/src/main/java/net/corda/testing/KubePodResult.java index 43377654ff..d5f725e646 100644 --- a/buildSrc/src/main/java/net/corda/testing/KubePodResult.java +++ b/buildSrc/src/main/java/net/corda/testing/KubePodResult.java @@ -3,6 +3,8 @@ package net.corda.testing; import io.fabric8.kubernetes.api.model.Pod; import java.io.File; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.CompletableFuture; public class KubePodResult { @@ -11,6 +13,7 @@ public class KubePodResult { private final CompletableFuture<Void> waiter; private volatile Integer resultCode = 255; private final File output; + private volatile Collection<File> binaryResults = Collections.emptyList(); KubePodResult(Pod createdPod, CompletableFuture<Void> waiter, File output) { this.createdPod = createdPod; diff --git a/buildSrc/src/main/java/net/corda/testing/KubesReporting.java b/buildSrc/src/main/java/net/corda/testing/KubesReporting.java index b54e73759d..ffd55b20f3 100644 --- a/buildSrc/src/main/java/net/corda/testing/KubesReporting.java +++ b/buildSrc/src/main/java/net/corda/testing/KubesReporting.java @@ -16,6 +16,7 @@ package net.corda.testing; +import org.apache.commons.compress.utils.IOUtils; import org.gradle.api.DefaultTask; import org.gradle.api.GradleException; import org.gradle.api.Transformer; @@ -33,6 +34,8 @@ import org.gradle.internal.operations.BuildOperationExecutor; import javax.inject.Inject; import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -49,6 +52,7 @@ public class KubesReporting extends DefaultTask { private File destinationDir = new File(getProject().getBuildDir(), "test-reporting"); private List<Object> results = new ArrayList<Object>(); List<KubePodResult> podResults = new ArrayList<>(); + boolean shouldPrintOutput = true; public KubesReporting() { //force this task to always run, as it's responsible for parsing exit codes @@ -147,12 +151,17 @@ public class KubesReporting extends DefaultTask { if (!containersWithNonZeroReturnCodes.isEmpty()) { String reportUrl = new ConsoleRenderer().asClickableFileUrl(new File(destinationDir, "index.html")); - - String containerOutputs = containersWithNonZeroReturnCodes.stream().map(KubePodResult::getOutput).map(file -> new ConsoleRenderer().asClickableFileUrl(file)).reduce("", - (s, s2) -> s + "\n" + s2 - ); - - String message = "remote build failed, check test report at " + reportUrl + "\n and container outputs at " + containerOutputs; + if (shouldPrintOutput){ + containersWithNonZeroReturnCodes.forEach(container -> { + try { + System.out.println("\n##### CONTAINER OUTPUT START #####"); + IOUtils.copy(new FileInputStream(container.getOutput()), System.out); + System.out.println("##### CONTAINER OUTPUT END #####\n"); + } catch (IOException ignored) { + } + }); + } + String message = "remote build failed, check test report at " + reportUrl; throw new GradleException(message); } } else {