WIP Kubenetes parallel build (#5396)

* Split integration tests

* add simple example of printing all methods annotated with @Test

* add docker plugin to root project
remove docker plugin from child projects
add Dockerfile for image to use when testing
add task to build testing image to root project

* add comment describing proposed testing workflow

* simple attempt at running tests in docker container

* add my first k8s interaction script

* add fabric8 as dependnency to buildSrc

* before adding classpath

* collect reports from containers and run through testReports

* re-enable kubes backed testing

* for each project
1. add a list tests task
2. use this list tests task to modify the included tests
3. add a parallel version of the test task

* tweak logic for downloading test report XML files

* use output of parallel testing tasks in report tasks to determine build resultCode

* prepare for jenkins test

* prepare for jenkins test

* make docker reg password system property

* add logging to print out docker reg creds

* enable docker build

* fix gradle build file

* gather xml files into root project

* change log level for gradle modification

* stop printing gradle docker push passwd

* tidy up report generation

* fix compilation errors

* split signature constraints test into two

* change Sig constraint tests type hierarchy

* tidy up build.gradle

* try method based test includes

* add unit test for test listing

* fix  bug with test slicing

* stop filtering ignored tests to make the numbers match existing runs

* change log level to ensure print out

* move all plugin logic to buildSrc files

* tidy up test modification
add comments to explain what DistributedTesting plugin does

* move new plugins into properly named packages

* tidy up runConfigs

* fix compile errors due to merge with slow-integration-test work

* add system parameter to enable / disable build modification

* add -Dkubenetise to build command

* address review comments

* type safe declaration of parameters in KubesTest
This commit is contained in:
Stefano Franz 2019-09-03 15:40:08 +00:00 committed by GitHub
parent 99f4e4aac2
commit a842740c9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 1545 additions and 356 deletions

7
.dockerignore Normal file
View File

@ -0,0 +1,7 @@
.git
.cache
.idea
.ci
.github
.bootstrapper
**/*.class

1
.gitignore vendored
View File

@ -41,7 +41,6 @@ lib/quasar.jar
# Include the -parameters compiler option by default in IntelliJ required for serialization.
!.idea/compiler.xml
!.idea/codeStyleSettings.xml
# if you remove the above rule, at least ignore the following:

View File

@ -1,3 +1,5 @@
import net.corda.testing.DistributedTesting
buildscript {
// For sharing constants between builds
Properties constants = new Properties()
@ -16,25 +18,25 @@ buildscript {
ext.quasar_group = 'co.paralleluniverse'
ext.quasar_version = constants.getProperty("quasarVersion")
ext.quasar_exclusions = [
'co.paralleluniverse**',
'groovy**',
'com.esotericsoftware.**',
'jdk**',
'junit**',
'kotlin**',
'net.rubygrapefruit.**',
'org.gradle.**',
'org.apache.**',
'org.jacoco.**',
'org.junit**',
'org.slf4j**',
'worker.org.gradle.**',
'com.nhaarman.mockito_kotlin**',
'org.assertj**',
'org.hamcrest**',
'org.mockito**',
'org.opentest4j**'
]
'co.paralleluniverse**',
'groovy**',
'com.esotericsoftware.**',
'jdk**',
'junit**',
'kotlin**',
'net.rubygrapefruit.**',
'org.gradle.**',
'org.apache.**',
'org.jacoco.**',
'org.junit**',
'org.slf4j**',
'worker.org.gradle.**',
'com.nhaarman.mockito_kotlin**',
'org.assertj**',
'org.hamcrest**',
'org.mockito**',
'org.opentest4j**'
]
// gradle-capsule-plugin:1.0.2 contains capsule:1.0.1 by default.
// We must configure it manually to use the latest capsule version.
@ -98,7 +100,7 @@ buildscript {
ext.jsch_version = '0.1.55'
ext.protonj_version = '0.33.0' // Overide Artemis version
ext.snappy_version = '0.4'
ext.class_graph_version = '4.8.41'
ext.class_graph_version = constants.getProperty('classgraphVersion')
ext.jcabi_manifests_version = '1.1'
ext.picocli_version = '3.9.6'
ext.commons_io_version = '2.6'
@ -113,7 +115,14 @@ buildscript {
// Updates [131, 161] also have zip compression bugs on MacOS (High Sierra).
// when the java version in NodeStartup.hasMinimumJavaVersion() changes, so must this check
ext.java8_minUpdateVersion = constants.getProperty('java8MinUpdateVersion')
ext.corda_revision = {
try {
"git rev-parse HEAD".execute().text.trim()
} catch (Exception ignored) {
logger.warn("git is unavailable in build environment")
"unknown"
}
}()
repositories {
mavenLocal()
mavenCentral()
@ -152,10 +161,10 @@ plugins {
// Add the shadow plugin to the plugins classpath for the entire project.
id 'com.github.johnrengelman.shadow' version '2.0.4' apply false
id "com.gradle.build-scan" version "2.2.1"
id 'com.bmuschko.docker-remote-api'
}
ext {
corda_revision = "git rev-parse HEAD".execute().text.trim()
}
apply plugin: 'project-report'
@ -172,7 +181,6 @@ apply plugin: 'java'
sourceCompatibility = 1.8
targetCompatibility = 1.8
allprojects {
apply plugin: 'kotlin'
apply plugin: 'jacoco'
@ -250,14 +258,14 @@ allprojects {
ex.append = false
}
maxParallelForks = (System.env.CORDA_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_TESTING_FORKS".toInteger()
maxParallelForks = (System.env.CORDA_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_TESTING_FORKS".toInteger()
systemProperty 'java.security.egd', 'file:/dev/./urandom'
}
tasks.withType(Test){
if (name.contains("integrationTest")){
maxParallelForks = (System.env.CORDA_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_INT_TESTING_FORKS".toInteger()
tasks.withType(Test) {
if (name.contains("integrationTest")) {
maxParallelForks = (System.env.CORDA_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_INT_TESTING_FORKS".toInteger()
}
}
@ -496,8 +504,6 @@ if (file('corda-docs-only-build').exists() || (System.getenv('CORDA_DOCS_ONLY_BU
}
}
wrapper {
gradleVersion = "5.4.1"
distributionType = Wrapper.DistributionType.ALL
@ -507,3 +513,5 @@ buildScan {
termsOfServiceUrl = 'https://gradle.com/terms-of-service'
termsOfServiceAgree = 'yes'
}
apply plugin: DistributedTesting

View File

@ -4,6 +4,7 @@ buildscript {
ext {
guava_version = constants.getProperty("guavaVersion")
class_graph_version = constants.getProperty('classgraphVersion')
assertj_version = '3.9.1'
junit_version = '4.12'
}
@ -12,6 +13,7 @@ buildscript {
repositories {
mavenLocal()
mavenCentral()
jcenter()
}
allprojects {
@ -30,4 +32,11 @@ dependencies {
runtime project.childProjects.collect { n, p ->
project(p.path)
}
compile gradleApi()
compile "io.fabric8:kubernetes-client:4.4.1"
compile 'org.apache.commons:commons-compress:1.19'
compile 'commons-codec:commons-codec:1.13'
compile "io.github.classgraph:classgraph:$class_graph_version"
compile "com.bmuschko:gradle-docker-plugin:5.0.0"
testCompile "junit:junit:$junit_version"
}

View File

@ -0,0 +1,177 @@
package net.corda.testing
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage
import org.gradle.api.Plugin
import org.gradle.api.Project
import org.gradle.api.Task
import org.gradle.api.tasks.testing.Test
/**
This plugin is responsible for wiring together the various components of test task modification
*/
class DistributedTesting implements Plugin<Project> {
static def getPropertyAsInt(Project proj, String property, Integer defaultValue) {
return proj.hasProperty(property) ? Integer.parseInt(proj.property(property).toString()) : defaultValue
}
@Override
void apply(Project project) {
if (System.getProperty("kubenetize") != null) {
ensureImagePluginIsApplied(project)
ImageBuilding imagePlugin = project.plugins.getPlugin(ImageBuilding)
DockerPushImage imageBuildingTask = imagePlugin.pushTask
//in each subproject
//1. add the task to determine all tests within the module
//2. modify the underlying testing task to use the output of the listing task to include a subset of tests for each fork
//3. KubesTest will invoke these test tasks in a parallel fashion on a remote k8s cluster
project.subprojects { Project subProject ->
subProject.tasks.withType(Test) { Test task ->
ListTests testListerTask = createTestListingTasks(task, subProject)
Test modifiedTestTask = modifyTestTaskForParallelExecution(subProject, task, testListerTask)
KubesTest parallelTestTask = generateParallelTestingTask(subProject, task, imageBuildingTask)
}
}
//now we are going to create "super" groupings of these KubesTest tasks, so that it is possible to invoke all submodule tests with a single command
//group all kubes tests by their underlying target task (test/integrationTest/smokeTest ... etc)
Map<String, List<KubesTest>> allKubesTestingTasksGroupedByType = project.subprojects.collect { prj -> prj.getAllTasks(false).values() }
.flatten()
.findAll { task -> task instanceof KubesTest }
.groupBy { task -> task.taskToExecuteName }
//first step is to create a single task which will invoke all the submodule tasks for each grouping
//ie allParallelTest will invoke [node:test, core:test, client:rpc:test ... etc]
//ie allIntegrationTest will invoke [node:integrationTest, core:integrationTest, client:rpc:integrationTest ... etc]
createGroupedParallelTestTasks(allKubesTestingTasksGroupedByType, project, imageBuildingTask)
}
}
private List<Task> createGroupedParallelTestTasks(Map<String, List<KubesTest>> allKubesTestingTasksGroupedByType, Project project, DockerPushImage imageBuildingTask) {
allKubesTestingTasksGroupedByType.entrySet().collect { entry ->
def taskType = entry.key
def allTasksOfType = entry.value
def allParallelTask = project.rootProject.tasks.create("allParallel" + taskType.capitalize(), KubesTest) {
dependsOn imageBuildingTask
printOutput = true
fullTaskToExecutePath = allTasksOfType.collect { task -> task.fullTaskToExecutePath }.join(" ")
taskToExecuteName = taskType
doFirst {
dockerTag = imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get()
}
}
//second step is to create a task to use the reports output by the parallel test task
def reportOnAllTask = project.rootProject.tasks.create("reportAllParallel${taskType.capitalize()}", KubesReporting) {
dependsOn allParallelTask
destinationDir new File(project.rootProject.getBuildDir(), "allResults${taskType.capitalize()}")
doFirst {
destinationDir.deleteDir()
podResults = allParallelTask.containerResults
reportOn(allParallelTask.testOutput)
}
}
//invoke this report task after parallel testing
allParallelTask.finalizedBy(reportOnAllTask)
project.logger.info "Created task: ${allParallelTask.getPath()} to enable testing on kubenetes for tasks: ${allParallelTask.fullTaskToExecutePath}"
project.logger.info "Created task: ${reportOnAllTask.getPath()} to generate test html output for task ${allParallelTask.getPath()}"
return allParallelTask
}
}
private KubesTest generateParallelTestingTask(Project projectContainingTask, Test task, DockerPushImage imageBuildingTask) {
def taskName = task.getName()
def capitalizedTaskName = task.getName().capitalize()
KubesTest createdParallelTestTask = projectContainingTask.tasks.create("parallel" + capitalizedTaskName, KubesTest) {
dependsOn imageBuildingTask
printOutput = true
fullTaskToExecutePath = task.getPath()
taskToExecuteName = taskName
doFirst {
dockerTag = imageBuildingTask.imageName.get() + ":" + imageBuildingTask.tag.get()
}
}
projectContainingTask.logger.info "Created task: ${createdParallelTestTask.getPath()} to enable testing on kubenetes for task: ${task.getPath()}"
return createdParallelTestTask as KubesTest
}
private Test modifyTestTaskForParallelExecution(Project subProject, Test task, ListTests testListerTask) {
subProject.logger.info("modifying task: ${task.getPath()} to depend on task ${testListerTask.getPath()}")
def reportsDir = new File(new File(subProject.rootProject.getBuildDir(), "test-reports"), subProject.name + "-" + task.name)
task.configure {
dependsOn testListerTask
binResultsDir new File(reportsDir, "binary")
reports.junitXml.destination new File(reportsDir, "xml")
maxHeapSize = "6g"
doFirst {
filter {
def fork = getPropertyAsInt(subProject, "dockerFork", 0)
def forks = getPropertyAsInt(subProject, "dockerForks", 1)
def shuffleSeed = 42
subProject.logger.info("requesting tests to include in testing task ${task.getPath()} (${fork}, ${forks}, ${shuffleSeed})")
List<String> includes = testListerTask.getTestsForFork(
fork,
forks,
shuffleSeed)
subProject.logger.info "got ${includes.size()} tests to include into testing task ${task.getPath()}"
if (includes.size() == 0) {
subProject.logger.info "Disabling test execution for testing task ${task.getPath()}"
excludeTestsMatching "*"
}
includes.forEach { include ->
subProject.logger.info "including: $include for testing task ${task.getPath()}"
includeTestsMatching include
}
failOnNoMatchingTests false
}
}
}
return task
}
private static void ensureImagePluginIsApplied(Project project) {
project.plugins.apply(ImageBuilding)
}
private ListTests createTestListingTasks(Test task, Project subProject) {
def taskName = task.getName()
def capitalizedTaskName = task.getName().capitalize()
//determine all the tests which are present in this test task.
//this list will then be shared between the various worker forks
def createdListTask = subProject.tasks.create("listTestsFor" + capitalizedTaskName, ListTests) {
//the convention is that a testing task is backed by a sourceSet with the same name
dependsOn subProject.getTasks().getByName("${taskName}Classes")
doFirst {
//we want to set the test scanning classpath to only the output of the sourceSet - this prevents dependencies polluting the list
scanClassPath = task.getTestClassesDirs() ? task.getTestClassesDirs() : Collections.emptyList()
}
}
//convenience task to utilize the output of the test listing task to display to local console, useful for debugging missing tests
def createdPrintTask = subProject.tasks.create("printTestsFor" + capitalizedTaskName) {
dependsOn createdListTask
doLast {
createdListTask.getTestsForFork(
getPropertyAsInt(subProject, "dockerFork", 0),
getPropertyAsInt(subProject, "dockerForks", 1),
42).forEach { testName ->
println testName
}
}
}
subProject.logger.info("created task: " + createdListTask.getPath() + " in project: " + subProject + " it dependsOn: " + createdListTask.dependsOn)
subProject.logger.info("created task: " + createdPrintTask.getPath() + " in project: " + subProject + " it dependsOn: " + createdPrintTask.dependsOn)
return createdListTask as ListTests
}
}

View File

@ -0,0 +1,101 @@
package net.corda.testing
import com.bmuschko.gradle.docker.DockerRegistryCredentials
import com.bmuschko.gradle.docker.tasks.container.DockerCreateContainer
import com.bmuschko.gradle.docker.tasks.container.DockerLogsContainer
import com.bmuschko.gradle.docker.tasks.container.DockerStartContainer
import com.bmuschko.gradle.docker.tasks.container.DockerWaitContainer
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import com.bmuschko.gradle.docker.tasks.image.DockerCommitImage
import com.bmuschko.gradle.docker.tasks.image.DockerPushImage
import com.bmuschko.gradle.docker.tasks.image.DockerTagImage
import org.gradle.api.Plugin
import org.gradle.api.Project
/**
this plugin is responsible for setting up all the required docker image building tasks required for producing and pushing an
image of the current build output to a remote container registry
*/
class ImageBuilding implements Plugin<Project> {
DockerPushImage pushTask
@Override
void apply(Project project) {
DockerBuildImage buildDockerImageForSource = project.tasks.create('buildDockerImageForSource', DockerBuildImage) {
dependsOn project.rootProject.getTasksByName("clean", true)
inputDir.set(new File("."))
dockerFile.set(new File(new File("testing"), "Dockerfile"))
}
DockerCreateContainer createBuildContainer = project.tasks.create('createBuildContainer', DockerCreateContainer) {
File gradleDir = new File((System.getProperty("java.io.tmpdir") + File.separator + "gradle"))
File mavenDir = new File((System.getProperty("java.io.tmpdir") + File.separator + "maven"))
doFirst {
if (!gradleDir.exists()) {
gradleDir.mkdirs()
}
if (!mavenDir.exists()) {
mavenDir.mkdirs()
}
}
dependsOn buildDockerImageForSource
targetImageId buildDockerImageForSource.getImageId()
binds = [(gradleDir.absolutePath): "/tmp/gradle", (mavenDir.absolutePath): "/home/root/.m2"]
}
DockerStartContainer startBuildContainer = project.tasks.create('startBuildContainer', DockerStartContainer) {
dependsOn createBuildContainer
targetContainerId createBuildContainer.getContainerId()
}
DockerLogsContainer logBuildContainer = project.tasks.create('logBuildContainer', DockerLogsContainer) {
dependsOn startBuildContainer
targetContainerId createBuildContainer.getContainerId()
follow = true
}
DockerWaitContainer waitForBuildContainer = project.tasks.create('waitForBuildContainer', DockerWaitContainer) {
dependsOn logBuildContainer
targetContainerId createBuildContainer.getContainerId()
}
DockerCommitImage commitBuildImageResult = project.tasks.create('commitBuildImageResult', DockerCommitImage) {
dependsOn waitForBuildContainer
targetContainerId createBuildContainer.getContainerId()
}
DockerTagImage tagBuildImageResult = project.tasks.create('tagBuildImageResult', DockerTagImage) {
dependsOn commitBuildImageResult
imageId = commitBuildImageResult.getImageId()
tag = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
repository = "stefanotestingcr.azurecr.io/testing"
}
def registryCredentialsForPush = new DockerRegistryCredentials(project.getObjects())
registryCredentialsForPush.username.set("stefanotestingcr")
registryCredentialsForPush.password.set(System.getProperty("docker.push.password") ? System.getProperty("docker.push.password") : "")
if (System.getProperty("docker.tag")) {
DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) {
doFirst {
registryCredentials = registryCredentialsForPush
}
imageName = "stefanotestingcr.azurecr.io/testing"
tag = System.getProperty("docker.tag")
}
this.pushTask = pushBuildImage
} else {
DockerPushImage pushBuildImage = project.tasks.create('pushBuildImage', DockerPushImage) {
dependsOn tagBuildImageResult
doFirst {
registryCredentials = registryCredentialsForPush
}
imageName = "stefanotestingcr.azurecr.io/testing"
tag = tagBuildImageResult.tag
}
this.pushTask = pushBuildImage
}
}
}

View File

@ -0,0 +1,299 @@
package net.corda.testing
import io.fabric8.kubernetes.api.model.*
import io.fabric8.kubernetes.client.*
import io.fabric8.kubernetes.client.dsl.ExecListener
import io.fabric8.kubernetes.client.dsl.ExecWatch
import io.fabric8.kubernetes.client.utils.Serialization
import okhttp3.Response
import org.gradle.api.DefaultTask
import org.gradle.api.GradleException
import org.gradle.api.tasks.TaskAction
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.IntStream
class KubesTest extends DefaultTask {
static final ExecutorService executorService = Executors.newCachedThreadPool()
static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor()
String dockerTag
String fullTaskToExecutePath
String taskToExecuteName
Boolean printOutput = false
public volatile List<File> testOutput = Collections.emptyList()
public volatile List<KubePodResult> containerResults = Collections.emptyList()
String namespace = "thisisatest"
int k8sTimeout = 50 * 1_000
int webSocketTimeout = k8sTimeout * 6
int numberOfPods = 20
int timeoutInMinutesForPodToStart = 60
@TaskAction
void runTestsOnKubes() {
try {
Class.forName("org.apache.commons.compress.archivers.tar.TarArchiveInputStream")
} catch (ClassNotFoundException ignored) {
throw new GradleException("Apache Commons compress has not be loaded, this can happen if running from within intellj - please select \"delegate to gradle\" for build and test actions")
}
def gitSha = new BigInteger(project.hasProperty("corda_revision") ? project.property("corda_revision").toString() : "0", 36)
def buildId = System.hasProperty("buildId") ? System.getProperty("buildId") : "UNKNOWN_BUILD"
def currentUser = System.hasProperty("user.name") ? System.getProperty("user.name") : "UNKNOWN_USER"
String stableRunId = new BigInteger(64, new Random(gitSha.intValue() + buildId.hashCode() + currentUser.hashCode())).toString(36).toLowerCase()
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase()
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
.withConnectionTimeout(k8sTimeout)
.withRequestTimeout(k8sTimeout)
.withRollingTimeout(k8sTimeout)
.withWebsocketTimeout(webSocketTimeout)
.withWebsocketPingInterval(webSocketTimeout)
.build()
final KubernetesClient client = new DefaultKubernetesClient(config)
client.pods().inNamespace(namespace).list().getItems().forEach({ podToDelete ->
if (podToDelete.getMetadata().name.contains(stableRunId)) {
project.logger.lifecycle("deleting: " + podToDelete.getMetadata().getName())
client.resource(podToDelete).delete()
}
})
Namespace ns = new NamespaceBuilder().withNewMetadata().withName(namespace).addToLabels("testing-env", "true").endMetadata().build()
client.namespaces().createOrReplace(ns)
List<CompletableFuture<KubePodResult>> podCreationFutures = IntStream.range(0, numberOfPods).mapToObj({ i ->
CompletableFuture.supplyAsync({
File outputFile = Files.createTempFile("container", ".log").toFile()
String podName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase()
Pod podRequest = buildPod(podName)
project.logger.lifecycle("created pod: " + podName)
Pod createdPod = client.pods().inNamespace(namespace).create(podRequest)
Runtime.getRuntime().addShutdownHook({
println "Deleting pod: " + podName
client.pods().delete(createdPod)
})
CompletableFuture<Void> waiter = new CompletableFuture<Void>()
KubePodResult result = new KubePodResult(createdPod, waiter, outputFile)
startBuildAndLogging(client, namespace, numberOfPods, i, podName, printOutput, waiter, { int resultCode ->
println podName + " has completed with resultCode=$resultCode"
result.setResultCode(resultCode)
}, outputFile)
return result
}, executorService)
}).collect(Collectors.toList())
def binaryFileFutures = podCreationFutures.collect { creationFuture ->
return creationFuture.thenComposeAsync({ podResult ->
return podResult.waiter.thenApply {
project.logger.lifecycle("Successfully terminated log streaming for " + podResult.createdPod.getMetadata().getName())
println "Gathering test results from ${podResult.createdPod.metadata.name}"
def binaryResults = downloadTestXmlFromPod(client, namespace, podResult.createdPod)
project.logger.lifecycle("deleting: " + podResult.createdPod.getMetadata().getName())
client.resource(podResult.createdPod).delete()
return binaryResults
}
}, singleThreadedExecutor)
}
def allFilesDownloadedFuture = CompletableFuture.allOf(*binaryFileFutures.toArray(new CompletableFuture[0])).thenApply {
def allBinaryFiles = binaryFileFutures.collect { future ->
Collection<File> binaryFiles = future.get()
return binaryFiles
}.flatten()
this.testOutput = Collections.synchronizedList(allBinaryFiles)
return allBinaryFiles
}
allFilesDownloadedFuture.get()
this.containerResults = podCreationFutures.collect { it -> it.get() }
}
void startBuildAndLogging(KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
CompletableFuture<Void> waiter,
Consumer<Integer> resultSetter,
File outputFileForContainer) {
try {
project.logger.lifecycle("Waiting for pod " + podName + " to start before executing build")
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES)
project.logger.lifecycle("pod " + podName + " has started, executing build")
Watch eventWatch = client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
@Override
void eventReceived(Watcher.Action action, Pod resource) {
project.logger.lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name())
}
@Override
void onClose(KubernetesClientException cause) {
}
})
def stdOutOs = new PipedOutputStream()
def stdOutIs = new PipedInputStream(4096)
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
def terminatingListener = new ExecListener() {
@Override
void onOpen(Response response) {
project.logger.lifecycle("Build started on pod " + podName)
}
@Override
void onFailure(Throwable t, Response response) {
project.logger.lifecycle("Received error from rom pod " + podName)
waiter.completeExceptionally(t)
}
@Override
void onClose(int code, String reason) {
project.logger.lifecycle("Received onClose() from pod " + podName + " with returnCode=" + code)
try {
def errChannelContents = errChannelStream.toString()
println errChannelContents
Status status = Serialization.unmarshal(errChannelContents, Status.class);
resultSetter.accept(status.details?.causes?.first()?.message?.toInteger() ? status.details?.causes?.first()?.message?.toInteger() : 0)
waiter.complete()
} catch (Exception e) {
waiter.completeExceptionally(e)
}
}
}
stdOutIs.connect(stdOutOs)
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(terminatingListener).exec(getBuildCommand(numberOfPods, podIdx))
project.logger.lifecycle("Pod: " + podName + " has started ")
Thread loggingThread = new Thread({ ->
BufferedWriter out = null
BufferedReader br = null
try {
out = new BufferedWriter(new FileWriter(outputFileForContainer))
br = new BufferedReader(new InputStreamReader(stdOutIs))
String line
while ((line = br.readLine()) != null) {
def toWrite = ("${taskToExecuteName}/Container" + podIdx + ": " + line).trim()
if (printOutput) {
project.logger.lifecycle(toWrite)
}
out.println(toWrite)
}
} catch (IOException ignored) {
}
finally {
out?.close()
br?.close()
}
})
loggingThread.setDaemon(true)
loggingThread.start()
} catch (InterruptedException ignored) {
throw new GradleException("Could not get slot on cluster within timeout")
}
}
Pod buildPod(String podName) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewVolume()
.withName("gradlecache")
.withNewHostPath()
.withPath("/gradle")
.withType("DirectoryOrCreate")
.endHostPath()
.endVolume()
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
//max container life time is 30min
.withArgs("-c", "sleep 1800")
.addNewEnv()
.withName("DRIVER_NODE_MEMORY")
.withValue("1024m")
.withName("DRIVER_WEB_MEMORY")
.withValue("1024m")
.endEnv()
.withName(podName)
.withNewResources()
.addToRequests("cpu", new Quantity("2"))
.addToRequests("memory", new Quantity("6Gi"))
.endResources()
.addNewVolumeMount()
.withName("gradlecache")
.withMountPath("/tmp/gradle")
.endVolumeMount()
.endContainer()
.withImagePullSecrets(new LocalObjectReference("regcred"))
.withRestartPolicy("Never")
.endSpec()
.build()
}
String[] getBuildCommand(int numberOfPods, int podIdx) {
return ["bash", "-c", "cd /tmp/source && ./gradlew -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " $fullTaskToExecutePath --info 2>&1 " +
"; let rs=\$? ; sleep 10 ; exit \${rs}"]
}
Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
String resultsInContainerPath = "/tmp/source/build/test-reports"
String binaryResultsFile = "results.bin"
String podName = cp.getMetadata().getName()
Path tempDir = new File(new File(project.getBuildDir(), "test-results-xml"), podName).toPath()
if (!tempDir.toFile().exists()) {
tempDir.toFile().mkdirs()
}
project.logger.lifecycle("saving to " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath())
client.pods()
.inNamespace(namespace)
.withName(podName)
.dir(resultsInContainerPath)
.copy(tempDir)
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile)
}
List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start))
List<File> folders = new ArrayList<>()
while (!filesToInspect.isEmpty()) {
File fileToInspect = filesToInspect.poll()
if (fileToInspect.getAbsolutePath().endsWith(fileNameToFind)) {
folders.add(fileToInspect.parentFile)
}
if (fileToInspect.isDirectory()) {
filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList()))
}
}
return folders
}
}

View File

@ -0,0 +1,75 @@
package net.corda.testing
import io.github.classgraph.ClassGraph
import io.github.classgraph.ClassInfo
import org.gradle.api.DefaultTask
import org.gradle.api.file.FileCollection
import org.gradle.api.tasks.TaskAction
import java.util.stream.Collectors
class ListShufflerAndAllocator {
private final List<String> tests
public ListShufflerAndAllocator(List<String> tests) {
this.tests = new ArrayList<>(tests)
}
List<String> getTestsForFork(int fork, int forks, Integer seed) {
Random shuffler = new Random(seed);
List<String> copy = new ArrayList<>(tests);
while (copy.size() < forks) {
//pad the list
copy.add(null);
}
Collections.shuffle(copy, shuffler);
int numberOfTestsPerFork = Math.max((copy.size() / forks).intValue(), 1);
int consumedTests = numberOfTestsPerFork * forks;
int ourStartIdx = numberOfTestsPerFork * fork;
int ourEndIdx = ourStartIdx + numberOfTestsPerFork;
int ourSupplementaryIdx = consumedTests + fork;
ArrayList<String> toReturn = new ArrayList<>(copy.subList(ourStartIdx, ourEndIdx));
if (ourSupplementaryIdx < copy.size()) {
toReturn.add(copy.get(ourSupplementaryIdx));
}
return toReturn.stream().filter { it -> it != null }.collect(Collectors.toList());
}
}
class ListTests extends DefaultTask {
FileCollection scanClassPath
List<String> allTests
def getTestsForFork(int fork, int forks, Integer seed) {
def gitSha = new BigInteger(project.hasProperty("corda_revision") ? project.property("corda_revision").toString() : "0", 36)
if (fork >= forks) {
throw new IllegalArgumentException("requested shard ${fork + 1} for total shards ${forks}")
}
def seedToUse = seed ? (seed + ((String) this.getPath()).hashCode() + gitSha.intValue()) : 0
return new ListShufflerAndAllocator(allTests).getTestsForFork(fork, forks, seedToUse)
}
@TaskAction
def discoverTests() {
Collection<String> results = new ClassGraph()
.enableClassInfo()
.enableMethodInfo()
.ignoreClassVisibility()
.ignoreMethodVisibility()
.enableAnnotationInfo()
.overrideClasspath(scanClassPath)
.scan()
.getClassesWithMethodAnnotation("org.junit.Test")
.collect { c -> (c.getSubclasses() + Collections.singletonList(c)) }
.flatten()
.collect { ClassInfo c ->
c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name }
}.flatten()
.toSet()
this.allTests = results.stream().sorted().collect(Collectors.toList())
}
}

View File

@ -0,0 +1,32 @@
package net.corda.testing
import org.gradle.api.Action
import org.gradle.api.DefaultTask
import org.gradle.api.Task
import org.gradle.api.tasks.TaskAction
import java.util.concurrent.CompletableFuture
class RunInParallel extends DefaultTask {
private List<Task> tasksToRunInParallel = new ArrayList<>()
public RunInParallel runInParallel(Task... tasks) {
for (Task task : tasks) {
tasksToRunInParallel.add(task)
}
return this;
}
@TaskAction
def void run() {
tasksToRunInParallel.collect { t ->
CompletableFuture.runAsync {
def actions = t.getActions()
for (Action action : actions) {
action.execute(t)
}
}
}.join()
}
}

View File

@ -0,0 +1,38 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.Pod;
import java.io.File;
import java.util.concurrent.CompletableFuture;
public class KubePodResult {
private final Pod createdPod;
private final CompletableFuture<Void> waiter;
private volatile Integer resultCode = 255;
private final File output;
KubePodResult(Pod createdPod, CompletableFuture<Void> waiter, File output) {
this.createdPod = createdPod;
this.waiter = waiter;
this.output = output;
}
public void setResultCode(Integer code) {
synchronized (createdPod) {
this.resultCode = code;
}
}
public Integer getResultCode() {
synchronized (createdPod) {
return this.resultCode;
}
}
public File getOutput() {
synchronized (createdPod) {
return output;
}
}
};

View File

@ -0,0 +1,181 @@
/*
* Copyright 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.corda.testing;
import org.gradle.api.DefaultTask;
import org.gradle.api.GradleException;
import org.gradle.api.Transformer;
import org.gradle.api.file.FileCollection;
import org.gradle.api.internal.file.UnionFileCollection;
import org.gradle.api.internal.tasks.testing.junit.result.AggregateTestResultsProvider;
import org.gradle.api.internal.tasks.testing.junit.result.BinaryResultBackedTestResultsProvider;
import org.gradle.api.internal.tasks.testing.junit.result.TestResultsProvider;
import org.gradle.api.internal.tasks.testing.report.DefaultTestReport;
import org.gradle.api.tasks.OutputDirectory;
import org.gradle.api.tasks.TaskAction;
import org.gradle.api.tasks.testing.Test;
import org.gradle.internal.logging.ConsoleRenderer;
import org.gradle.internal.operations.BuildOperationExecutor;
import org.gradle.internal.operations.BuildOperationFailure;
import javax.inject.Inject;
import java.io.File;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import static org.gradle.internal.concurrent.CompositeStoppable.stoppable;
import static org.gradle.util.CollectionUtils.collect;
/**
* Shameful copy of org.gradle.api.tasks.testing.TestReport - modified to handle results from k8s testing.
* see https://docs.gradle.org/current/dsl/org.gradle.api.tasks.testing.TestReport.html
*/
public class KubesReporting extends DefaultTask {
private File destinationDir = new File(getProject().getBuildDir(), "test-reporting");
private List<Object> results = new ArrayList<Object>();
List<KubePodResult> podResults = new ArrayList<>();
@Inject
protected BuildOperationExecutor getBuildOperationExecutor() {
throw new UnsupportedOperationException();
}
/**
* Returns the directory to write the HTML report to.
*/
@OutputDirectory
public File getDestinationDir() {
return destinationDir;
}
/**
* Sets the directory to write the HTML report to.
*/
public void setDestinationDir(File destinationDir) {
this.destinationDir = destinationDir;
}
/**
* Returns the set of binary test results to include in the report.
*/
public FileCollection getTestResultDirs() {
UnionFileCollection dirs = new UnionFileCollection();
for (Object result : results) {
addTo(result, dirs);
}
return dirs;
}
private void addTo(Object result, UnionFileCollection dirs) {
if (result instanceof Test) {
Test test = (Test) result;
dirs.addToUnion(getProject().files(test.getBinResultsDir()).builtBy(test));
} else if (result instanceof Iterable<?>) {
Iterable<?> iterable = (Iterable<?>) result;
for (Object nested : iterable) {
addTo(nested, dirs);
}
} else {
dirs.addToUnion(getProject().files(result));
}
}
/**
* Sets the binary test results to use to include in the report. Each entry must point to a binary test results directory generated by a {@link Test}
* task.
*/
public void setTestResultDirs(Iterable<File> testResultDirs) {
this.results.clear();
reportOn(testResultDirs);
}
/**
* Adds some results to include in the report.
*
* <p>This method accepts any parameter of the given types:
*
* <ul>
*
* <li>A {@link Test} task instance. The results from the test task are included in the report. The test task is automatically added
* as a dependency of this task.</li>
*
* <li>Anything that can be converted to a set of {@link File} instances as per {@link org.gradle.api.Project#files(Object...)}. These must
* point to the binary test results directory generated by a {@link Test} task instance.</li>
*
* <li>An {@link Iterable}. The contents of the iterable are converted recursively.</li>
*
* </ul>
*
* @param results The result objects.
*/
public void reportOn(Object... results) {
for (Object result : results) {
this.results.add(result);
}
}
@TaskAction
void generateReport() {
TestResultsProvider resultsProvider = createAggregateProvider();
try {
if (resultsProvider.isHasResults()) {
DefaultTestReport testReport = new DefaultTestReport(getBuildOperationExecutor());
testReport.generateReport(resultsProvider, getDestinationDir());
List<KubePodResult> containersWithNonZeroReturnCodes = podResults.stream()
.filter(result -> result.getResultCode() != 0)
.collect(Collectors.toList());
if (!containersWithNonZeroReturnCodes.isEmpty()) {
String reportUrl = new ConsoleRenderer().asClickableFileUrl(new File(destinationDir, "index.html"));
String containerOutputs = containersWithNonZeroReturnCodes.stream().map(KubePodResult::getOutput).map(file -> new ConsoleRenderer().asClickableFileUrl(file)).reduce("",
(s, s2) -> s + "\n" + s2
);
String message = "remote build failed, check test report at " + reportUrl + "\n and container outputs at " + containerOutputs;
throw new GradleException(message);
}
} else {
getLogger().info("{} - no binary test results found in dirs: {}.", getPath(), getTestResultDirs().getFiles());
setDidWork(false);
}
} finally {
stoppable(resultsProvider).stop();
}
}
public TestResultsProvider createAggregateProvider() {
List<TestResultsProvider> resultsProviders = new LinkedList<TestResultsProvider>();
try {
FileCollection resultDirs = getTestResultDirs();
if (resultDirs.getFiles().size() == 1) {
return new BinaryResultBackedTestResultsProvider(resultDirs.getSingleFile());
} else {
return new AggregateTestResultsProvider(collect(resultDirs, resultsProviders, new Transformer<TestResultsProvider, File>() {
public TestResultsProvider transform(File dir) {
return new BinaryResultBackedTestResultsProvider(dir);
}
}));
}
} catch (RuntimeException e) {
stoppable(resultsProviders).stop();
throw e;
}
}
}

View File

@ -0,0 +1,38 @@
package net.corda.testing
import org.hamcrest.CoreMatchers
import org.junit.Assert
import org.junit.Test
import java.util.stream.Collectors
import java.util.stream.IntStream
import static org.hamcrest.core.Is.is
import static org.hamcrest.core.IsEqual.equalTo
class ListTestsTest {
@Test
void shouldAllocateTests() {
for (int numberOfTests = 0; numberOfTests < 100; numberOfTests++) {
for (int numberOfForks = 1; numberOfForks < 100; numberOfForks++) {
List<String> tests = IntStream.range(0, numberOfTests).collect { z -> "Test.method" + z }
ListShufflerAndAllocator testLister = new ListShufflerAndAllocator(tests);
List<String> listOfLists = new ArrayList<>();
for (int fork = 0; fork < numberOfForks; fork++) {
listOfLists.addAll(testLister.getTestsForFork(fork, numberOfForks, 0));
}
Assert.assertThat(listOfLists.size(), CoreMatchers.is(tests.size()));
Assert.assertThat(new HashSet<>(listOfLists).size(), CoreMatchers.is(tests.size()));
Assert.assertThat(listOfLists.stream().sorted().collect(Collectors.toList()), is(equalTo(tests.stream().sorted().collect(Collectors.toList()))));
}
}
}
}

View File

@ -4,47 +4,16 @@ import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.internal.packageName
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.getOrThrow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.NodeWithInfo
import net.corda.node.services.Permissions
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Assume.assumeFalse
import org.junit.Before
import org.junit.Test
class FlowsExecutionModeRpcTest {
@Test
fun `persistent state survives node restart`() {
// Temporary disable this test when executed on Windows. It is known to be sporadically failing.
// More investigation is needed to establish why.
assumeFalse(System.getProperty("os.name").toLowerCase().startsWith("win"))
val user = User("mark", "dadada", setOf(invokeRpc("setFlowsDrainingModeEnabled"), invokeRpc("isFlowsDrainingModeEnabled")))
driver(DriverParameters(inMemoryDB = false, startNodesInProcess = true, notarySpecs = emptyList())) {
val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.chooseIdentity().name
nodeHandle.rpc.setFlowsDrainingModeEnabled(true)
nodeHandle.stop()
nodeName
}()
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user)).getOrThrow()
assertThat(nodeHandle.rpc.isFlowsDrainingModeEnabled()).isEqualTo(true)
nodeHandle.stop()
}
}
}
class FlowsExecutionModeTests : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))

View File

@ -15,6 +15,7 @@ guavaVersion=28.0-jre
quasarVersion=0.7.10
proguardVersion=6.1.1
bouncycastleVersion=1.60
classgraphVersion=4.8.41
disruptorVersion=3.4.2
typesafeConfigVersion=1.3.4
jsr305Version=3.0.2

View File

@ -75,6 +75,8 @@ dependencies {
testCompile "org.mockito:mockito-core:$mockito_version"
testCompile "org.assertj:assertj-core:$assertj_version"
testCompile "com.natpryce:hamkrest:$hamkrest_version"
testCompile 'org.hamcrest:hamcrest-library:2.1'
}
// TODO Consider moving it to quasar-utils in the future (introduced with PR-1388)

View File

@ -1,7 +1,3 @@
plugins {
id 'com.bmuschko.docker-remote-api' version '3.4.4'
}
evaluationDependsOn(":node:capsule")
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
@ -43,7 +39,7 @@ shadowJar {
docker{
registryCredentials {
url = System.env.DOCKER_URL
url = System.env.DOCKER_URL ?: "hub.docker.com"
username = System.env.DOCKER_USERNAME
password = System.env.DOCKER_PASSWORD
}

View File

@ -1,5 +1,6 @@
#Wed Aug 21 10:48:19 BST 2019
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME

View File

@ -4,8 +4,13 @@ buildscript {
file("$projectDir/src/main/resources/build.properties").withInputStream { properties.load(it) }
ext.jolokia_version = properties.getProperty('jolokiaAgentVersion')
dependencies {
classpath group: 'com.github.docker-java', name: 'docker-java', version: '3.1.5'
}
}
plugins {
id 'com.google.cloud.tools.jib' version '0.9.4'
}
@ -25,6 +30,9 @@ description 'Corda node modules'
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
slowIntegrationTestCompile.extendsFrom testCompile
slowIntegrationTestRuntimeOnly.extendsFrom testRuntimeOnly
}
sourceSets {
@ -43,13 +51,28 @@ sourceSets {
srcDir file('src/integration-test/resources')
}
}
slowIntegrationTest {
kotlin {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test-slow/kotlin')
}
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test-slow/java')
}
resources {
srcDir file('src/integration-test-slow/resources')
}
}
}
jib.container {
mainClass = "net.corda.node.Corda"
args = ['--log-to-console', '--no-local-shell', '--config-file=/config/node.conf']
// The Groovy string needs to be converted to a `java.lang.String` below.
jvmFlags = ['-Xmx1g', "-javaagent:/app/libs/quasar-core-${quasar_version}-jdk8.jar".toString()]
mainClass = "net.corda.node.Corda"
args = ['--log-to-console', '--no-local-shell', '--config-file=/config/node.conf']
// The Groovy string needs to be converted to a `java.lang.String` below.
jvmFlags = ['-Xmx1g', "-javaagent:/app/libs/quasar-core-${quasar_version}-jdk8.jar".toString()]
}
// Use manual resource copying of log4j2.xml rather than source sets.
@ -74,7 +97,7 @@ dependencies {
compile project(':common-validation')
compile project(':common-configuration-parsing')
compile project(':common-logging')
// Backwards compatibility goo: Apps expect confidential-identities to be loaded by default.
// We could eventually gate this on a target-version check.
compile project(':confidential-identities')
@ -107,7 +130,7 @@ dependencies {
compile "commons-beanutils:commons-beanutils:${beanutils_version}"
compile "org.apache.activemq:artemis-server:${artemis_version}"
compile "org.apache.activemq:artemis-core-client:${artemis_version}"
runtime ("org.apache.activemq:artemis-amqp-protocol:${artemis_version}") {
runtime("org.apache.activemq:artemis-amqp-protocol:${artemis_version}") {
// Gains our proton-j version from core module.
exclude group: 'org.apache.qpid', module: 'proton-j'
}
@ -201,6 +224,13 @@ dependencies {
testCompile(project(':test-cli'))
testCompile(project(':test-utils'))
slowIntegrationTestCompile sourceSets.main.output
slowIntegrationTestCompile sourceSets.test.output
slowIntegrationTestCompile configurations.compile