use zulu for jdk in testing image (#5583)

* use zulu for jdk
add some parallel groups

* port kubesTest to Java
remove asterix from tests listed by ListTests, instead add after allocation
This commit is contained in:
Stefano Franz 2019-10-14 13:35:22 +01:00 committed by GitHub
parent 970f60c625
commit 6e98adb085
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 406 additions and 363 deletions

View File

@ -612,6 +612,14 @@ task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
memoryInGbPerFork 10
distribute Distribution.CLASS
}
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "smokeTest"
numberOfShards 4
streamOutput true
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.METHOD
}
task allParallelUnitTest(type: ParallelTestGroup) {
testGroups "test"
numberOfShards 10
@ -623,7 +631,15 @@ task allParallelUnitTest(type: ParallelTestGroup) {
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest"
numberOfShards 15
streamOutput true
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS
}
task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
numberOfShards 5
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute Distribution.CLASS

View File

@ -51,13 +51,12 @@ public class BucketingAllocator {
System.out.println("Number of tests: " + container.testsForFork.stream().mapToInt(b -> b.foundTests.size()).sum());
System.out.println("Tests to Run: ");
container.testsForFork.forEach(tb -> {
System.out.println(tb.nameWithAsterix);
System.out.println(tb.testName);
tb.foundTests.forEach(ft -> System.out.println("\t" + ft.getFirst() + ", " + ft.getSecond()));
});
});
}
private void allocateTestsToForks(@NotNull List<TestBucket> matchedTests) {
matchedTests.forEach(matchedTestBucket -> {
TestsForForkContainer smallestContainer = Collections.min(forkContainers, Comparator.comparing(TestsForForkContainer::getCurrentDuration));
@ -69,10 +68,9 @@ public class BucketingAllocator {
return allDiscoveredTests.stream().map(tuple -> {
String testName = tuple.getFirst();
Object task = tuple.getSecond();
String noAsterixName = testName.substring(0, testName.length() - 1);
//2DO [can this filtering algorithm be improved - the test names are sorted, it should be possible to do something using binary search]
List<Tuple2<String, Double>> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(noAsterixName)).collect(Collectors.toList());
return new TestBucket(task, testName, noAsterixName, matchingTests);
List<Tuple2<String, Double>> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(testName)).collect(Collectors.toList());
return new TestBucket(task, testName, matchingTests);
}).sorted(Comparator.comparing(TestBucket::getDuration).reversed()).collect(Collectors.toList());
}
@ -81,22 +79,20 @@ public class BucketingAllocator {
TestLister lister = source.getFirst();
Object testTask = source.getSecond();
return lister.getAllTestsDiscovered().stream().map(test -> new Tuple2<>(test, testTask)).collect(Collectors.toList());
}).flatMap(Collection::stream).collect(Collectors.toList());
}).flatMap(Collection::stream).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList());
}
public static class TestBucket {
final Object testTask;
final String nameWithAsterix;
final String nameWithoutAsterix;
final String testName;
final List<Tuple2<String, Double>> foundTests;
final Double duration;
public TestBucket(Object testTask, String nameWithAsterix, String nameWithoutAsterix, List<Tuple2<String, Double>> foundTests) {
public TestBucket(Object testTask, String testName, List<Tuple2<String, Double>> foundTests) {
this.testTask = testTask;
this.nameWithAsterix = nameWithAsterix;
this.nameWithoutAsterix = nameWithoutAsterix;
this.testName = testName;
this.foundTests = foundTests;
duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 10)).sum(), 10);
duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 1)).sum(), 1);
}
public Double getDuration() {
@ -107,8 +103,7 @@ public class BucketingAllocator {
public String toString() {
return "TestBucket{" +
"testTask=" + testTask +
", nameWithAsterix='" + nameWithAsterix + '\'' +
", nameWithoutAsterix='" + nameWithoutAsterix + '\'' +
", nameWithAsterix='" + testName + '\'' +
", foundTests=" + foundTests +
", duration=" + duration +
'}';
@ -142,7 +137,7 @@ public class BucketingAllocator {
}
public List<String> getTestsForTask(Object task) {
return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.nameWithAsterix).collect(Collectors.toList());
return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.testName).collect(Collectors.toList());
}
public List<TestBucket> getBucketsForFork() {

View File

@ -41,8 +41,8 @@ public class BucketingAllocatorTask extends DefaultTask {
this.dependsOn(source);
}
public List<String> getTestsForForkAndTestTask(Integer fork, Test testTask) {
return allocator.getTestsForForkAndTestTask(fork, testTask);
public List<String> getTestIncludesForForkAndTestTask(Integer fork, Test testTask) {
return allocator.getTestsForForkAndTestTask(fork, testTask).stream().map(t -> t + "*").collect(Collectors.toList());
}
@TaskAction
@ -56,11 +56,11 @@ public class BucketingAllocatorTask extends DefaultTask {
String duration = "Duration(ms)";
List<CSVRecord> records = CSVFormat.DEFAULT.withHeader().parse(reader).getRecords();
return records.stream().map(record -> {
try{
try {
String testName = record.get(name);
String testDuration = record.get(duration);
return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 10));
}catch (IllegalArgumentException | IllegalStateException e){
return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 1));
} catch (IllegalArgumentException | IllegalStateException e) {
return null;
}
}).filter(Objects::nonNull).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList());

View File

@ -131,7 +131,7 @@ class DistributedTesting implements Plugin<Project> {
filter {
def fork = getPropertyAsInt(subProject, "dockerFork", 0)
subProject.logger.info("requesting tests to include in testing task ${task.getPath()} (idx: ${fork})")
List<String> includes = globalAllocator.getTestsForForkAndTestTask(
List<String> includes = globalAllocator.getTestIncludesForForkAndTestTask(
fork,
task)
subProject.logger.info "got ${includes.size()} tests to include into testing task ${task.getPath()}"

View File

@ -1,325 +0,0 @@
package net.corda.testing
import io.fabric8.kubernetes.api.model.*
import io.fabric8.kubernetes.client.*
import io.fabric8.kubernetes.client.dsl.ExecListener
import io.fabric8.kubernetes.client.dsl.ExecWatch
import io.fabric8.kubernetes.client.utils.Serialization
import okhttp3.Response
import org.gradle.api.DefaultTask
import org.gradle.api.GradleException
import org.gradle.api.tasks.TaskAction
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import java.util.stream.IntStream
class KubesTest extends DefaultTask {
static final ExecutorService executorService = Executors.newCachedThreadPool()
static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor()
String dockerTag
String fullTaskToExecutePath
String taskToExecuteName
Boolean printOutput = false
Integer numberOfCoresPerFork = 4
Integer memoryGbPerFork = 6
public volatile List<File> testOutput = Collections.emptyList()
public volatile List<KubePodResult> containerResults = Collections.emptyList()
String namespace = "thisisatest"
int k8sTimeout = 50 * 1_000
int webSocketTimeout = k8sTimeout * 6
int numberOfPods = 20
int timeoutInMinutesForPodToStart = 60
Distribution distribution = Distribution.METHOD
@TaskAction
void runTestsOnKubes() {
try {
Class.forName("org.apache.commons.compress.archivers.tar.TarArchiveInputStream")
} catch (ClassNotFoundException ignored) {
throw new GradleException("Apache Commons compress has not be loaded, this can happen if running from within intellj - please select \"delegate to gradle\" for build and test actions")
}
def buildId = System.getProperty("buildId") ? System.getProperty("buildId") :
(project.hasProperty("corda_revision") ? project.property("corda_revision").toString() : "0")
def currentUser = System.getProperty("user.name") ? System.getProperty("user.name") : "UNKNOWN_USER"
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase()
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase()
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
.withConnectionTimeout(k8sTimeout)
.withRequestTimeout(k8sTimeout)
.withRollingTimeout(k8sTimeout)
.withWebsocketTimeout(webSocketTimeout)
.withWebsocketPingInterval(webSocketTimeout)
.build()
final KubernetesClient client = new DefaultKubernetesClient(config)
try {
client.pods().inNamespace(namespace).list().getItems().forEach({ podToDelete ->
if (podToDelete.getMetadata().name.contains(stableRunId)) {
project.logger.lifecycle("deleting: " + podToDelete.getMetadata().getName())
client.resource(podToDelete).delete()
}
})
} catch (Exception ignored) {
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
}
List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj({ i ->
String potentialPodName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase()
String podName = potentialPodName.substring(0, Math.min(potentialPodName.size(), 62))
runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3)
}).collect(Collectors.toList())
this.testOutput = Collections.synchronizedList(futures.collect { it -> it.get().binaryResults }.flatten())
this.containerResults = futures.collect { it -> it.get() }
}
CompletableFuture<KubePodResult> runBuild(KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries) {
CompletableFuture<KubePodResult> toReturn = new CompletableFuture<KubePodResult>()
executorService.submit({
int tryCount = 0
Pod createdPod = null
while (tryCount < numberOfRetries) {
try {
Pod podRequest = buildPod(podName)
project.logger.lifecycle("requesting pod: " + podName)
createdPod = client.pods().inNamespace(namespace).create(podRequest)
project.logger.lifecycle("scheduled pod: " + podName)
File outputFile = Files.createTempFile("container", ".log").toFile()
attachStatusListenerToPod(client, namespace, podName)
schedulePodForDeleteOnShutdown(podName, client, createdPod)
waitForPodToStart(podName, client, namespace)
def stdOutOs = new PipedOutputStream()
def stdOutIs = new PipedInputStream(4096)
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
KubePodResult result = new KubePodResult(createdPod, null, outputFile)
CompletableFuture<KubePodResult> waiter = new CompletableFuture<>()
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result)
stdOutIs.connect(stdOutOs)
String[] buildCommand = getBuildCommand(numberOfPods, podIdx)
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(execListener).exec(buildCommand)
startLogPumping(outputFile, stdOutIs, podIdx, printOutput)
KubePodResult execResult = waiter.join()
project.logger.lifecycle("build has ended on on pod ${podName} (${podIdx}/${numberOfPods})")
project.logger.lifecycle "Gathering test results from ${execResult.createdPod.metadata.name}"
def binaryResults = downloadTestXmlFromPod(client, namespace, execResult.createdPod)
project.logger.lifecycle("deleting: " + execResult.createdPod.getMetadata().getName())
client.resource(execResult.createdPod).delete()
result.binaryResults = binaryResults
toReturn.complete(result)
break
} catch (Exception e) {
logger.error("Encountered error during testing cycle on pod ${podName} (${podIdx}/${numberOfPods})", e)
try {
if (createdPod) {
client.pods().delete(createdPod)
while (client.pods().inNamespace(namespace).list().getItems().find { p -> p.metadata.name == podName }) {
logger.warn("pod ${podName} has not been deleted, waiting 1s")
Thread.sleep(1000)
}
}
} catch (Exception ignored) {
}
tryCount++
logger.lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times")
}
}
if (tryCount >= numberOfRetries) {
toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit"))
}
})
return toReturn
}
void startLogPumping(File outputFile, stdOutIs, podIdx, boolean printOutput) {
Thread loggingThread = new Thread({ ->
BufferedWriter out = null
BufferedReader br = null
try {
out = new BufferedWriter(new FileWriter(outputFile))
br = new BufferedReader(new InputStreamReader(stdOutIs))
String line
while ((line = br.readLine()) != null) {
def toWrite = ("${taskToExecuteName}/Container" + podIdx + ": " + line).trim()
if (printOutput) {
project.logger.lifecycle(toWrite)
}
out.println(toWrite)
}
} catch (IOException ignored) {
}
finally {
out?.close()
br?.close()
}
})
loggingThread.setDaemon(true)
loggingThread.start()
}
ExecListener buildExecListenerForPod(podName, errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) {
new ExecListener() {
final Long start = System.currentTimeMillis()
@Override
void onOpen(Response response) {
project.logger.lifecycle("Build started on pod $podName")
}
@Override
void onFailure(Throwable t, Response response) {
project.logger.lifecycle("Received error from rom pod $podName")
waitingFuture.completeExceptionally(t)
}
@Override
void onClose(int code, String reason) {
project.logger.lifecycle("Received onClose() from pod ${podName}, build took: ${(System.currentTimeMillis() - start) / 1000} seconds")
try {
def errChannelContents = errChannelStream.toString()
Status status = Serialization.unmarshal(errChannelContents, Status.class);
result.resultCode = status.details?.causes?.first()?.message?.toInteger() ? status.details?.causes?.first()?.message?.toInteger() : 0
waitingFuture.complete(result)
} catch (Exception e) {
waitingFuture.completeExceptionally(e)
}
}
}
}
void schedulePodForDeleteOnShutdown(String podName, client, Pod createdPod) {
project.logger.info("attaching shutdown hook for pod ${podName}")
Runtime.getRuntime().addShutdownHook({
println "Deleting pod: " + podName
client.pods().delete(createdPod)
})
}
Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) {
client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
@Override
void eventReceived(Watcher.Action action, Pod resource) {
project.logger.lifecycle("[StatusChange] pod ${resource.getMetadata().getName()} ${action.name()} (${resource.status.phase})")
}
@Override
void onClose(KubernetesClientException cause) {
}
})
}
void waitForPodToStart(String podName, KubernetesClient client, String namespace) {
project.logger.lifecycle("Waiting for pod " + podName + " to start before executing build")
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES)
project.logger.lifecycle("pod " + podName + " has started, executing build")
}
Pod buildPod(String podName) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewVolume()
.withName("gradlecache")
.withNewHostPath()
.withPath("/tmp/gradle")
.withType("DirectoryOrCreate")
.endHostPath()
.endVolume()
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
.withArgs("-c", "sleep 3600")
.addNewEnv()
.withName("DRIVER_NODE_MEMORY")
.withValue("1024m")
.withName("DRIVER_WEB_MEMORY")
.withValue("1024m")
.endEnv()
.withName(podName)
.withNewResources()
.addToRequests("cpu", new Quantity("${numberOfCoresPerFork}"))
.addToRequests("memory", new Quantity("${memoryGbPerFork}Gi"))
.endResources()
.addNewVolumeMount()
.withName("gradlecache")
.withMountPath("/tmp/gradle")
.endVolumeMount()
.endContainer()
.withImagePullSecrets(new LocalObjectReference("regcred"))
.withRestartPolicy("Never")
.endSpec()
.build()
}
String[] getBuildCommand(int numberOfPods, int podIdx) {
return ["bash", "-c",
"let x=1 ; while [ \${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=\$? ; sleep 1 ; done ; " +
"cd /tmp/source ; " +
"let y=1 ; while [ \${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=\$? ; sleep 1 ; done ;" +
"./gradlew -D${ListTests.DISTRIBUTION_PROPERTY}=${distribution.name()} -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " $fullTaskToExecutePath --info 2>&1 ;" +
"let rs=\$? ; sleep 10 ; exit \${rs}"]
}
Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
String resultsInContainerPath = "/tmp/source/build/test-reports"
String binaryResultsFile = "results.bin"
String podName = cp.getMetadata().getName()
Path tempDir = new File(new File(project.getBuildDir(), "test-results-xml"), podName).toPath()
if (!tempDir.toFile().exists()) {
tempDir.toFile().mkdirs()
}
project.logger.lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath())
client.pods()
.inNamespace(namespace)
.withName(podName)
.dir(resultsInContainerPath)
.copy(tempDir)
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile)
}
List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start))
List<File> folders = new ArrayList<>()
while (!filesToInspect.isEmpty()) {
File fileToInspect = filesToInspect.poll()
if (fileToInspect.getAbsolutePath().endsWith(fileNameToFind)) {
folders.add(fileToInspect.parentFile)
}
if (fileToInspect.isDirectory()) {
filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList()))
}
}
return folders
}
}

View File

@ -0,0 +1,339 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.utils.Serialization;
import okhttp3.Response;
import org.gradle.api.DefaultTask;
import org.gradle.api.tasks.TaskAction;
import org.jetbrains.annotations.NotNull;
import java.io.*;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class KubesTest extends DefaultTask {
static final ExecutorService executorService = Executors.newCachedThreadPool();
static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor();
String dockerTag;
String fullTaskToExecutePath;
String taskToExecuteName;
Boolean printOutput = false;
Integer numberOfCoresPerFork = 4;
Integer memoryGbPerFork = 6;
public volatile List<File> testOutput = Collections.emptyList();
public volatile List<KubePodResult> containerResults = Collections.emptyList();
String namespace = "thisisatest";
int k8sTimeout = 50 * 1_000;
int webSocketTimeout = k8sTimeout * 6;
int numberOfPods = 20;
int timeoutInMinutesForPodToStart = 60;
Distribution distribution = Distribution.METHOD;
@TaskAction
public void runDistributedTests() {
String buildId = System.getProperty("buildId", "0");
String currentUser = System.getProperty("user.name", "UNKNOWN_USER");
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase();
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase();
final KubernetesClient client = getKubernetesClient();
try {
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName());
client.resource(podToDelete).delete();
}
});
} catch (Exception ignored) {
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
}
List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
String potentialPodName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase();
String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62));
return runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
}).collect(Collectors.toList());
this.testOutput = Collections.synchronizedList(futures.stream().map(it -> {
try {
return it.get().getBinaryResults();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).flatMap(Collection::stream).collect(Collectors.toList()));
this.containerResults = futures.stream().map(it -> {
try {
return it.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}
@NotNull
KubernetesClient getKubernetesClient() {
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
.withConnectionTimeout(k8sTimeout)
.withRequestTimeout(k8sTimeout)
.withRollingTimeout(k8sTimeout)
.withWebsocketTimeout(webSocketTimeout)
.withWebsocketPingInterval(webSocketTimeout)
.build();
return new DefaultKubernetesClient(config);
}
CompletableFuture<KubePodResult> runBuild(KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries) {
CompletableFuture<KubePodResult> toReturn = new CompletableFuture<>();
executorService.submit(() -> {
int tryCount = 0;
Pod createdPod = null;
while (tryCount < numberOfRetries) {
try {
Pod podRequest = buildPod(podName);
getProject().getLogger().lifecycle("requesting pod: " + podName);
createdPod = client.pods().inNamespace(namespace).create(podRequest);
getProject().getLogger().lifecycle("scheduled pod: " + podName);
File outputFile = Files.createTempFile("container", ".log").toFile();
attachStatusListenerToPod(client, namespace, podName);
schedulePodForDeleteOnShutdown(podName, client, createdPod);
waitForPodToStart(podName, client, namespace);
PipedOutputStream stdOutOs = new PipedOutputStream();
PipedInputStream stdOutIs = new PipedInputStream(4096);
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
KubePodResult result = new KubePodResult(createdPod, outputFile);
CompletableFuture<KubePodResult> waiter = new CompletableFuture<>();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result);
stdOutIs.connect(stdOutOs);
String[] buildCommand = getBuildCommand(numberOfPods, podIdx);
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
.writingOutput(stdOutOs)
.writingErrorChannel(errChannelStream)
.usingListener(execListener).exec(buildCommand);
startLogPumping(outputFile, stdOutIs, podIdx, printOutput);
KubePodResult execResult = waiter.join();
getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + ")");
getLogger().lifecycle("Gathering test results from " + execResult.getCreatedPod().getMetadata().getName());
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, execResult.getCreatedPod());
getLogger().lifecycle("deleting: " + execResult.getCreatedPod().getMetadata().getName());
client.resource(execResult.getCreatedPod()).delete();
result.setBinaryResults(binaryResults);
toReturn.complete(result);
break;
} catch (Exception e) {
getLogger().error("Encountered error during testing cycle on pod " + podName + " (" + podIdx / numberOfPods + ")", e);
try {
if (createdPod != null) {
client.pods().delete(createdPod);
while (client.pods().inNamespace(namespace).list().getItems().stream().anyMatch(p -> Objects.equals(p.getMetadata().getName(), podName))) {
getLogger().warn("pod " + podName + " has not been deleted, waiting 1s");
Thread.sleep(1000);
}
}
} catch (Exception ignored) {
}
tryCount++;
getLogger().lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times");
}
}
if (tryCount >= numberOfRetries) {
toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit"));
}
});
return toReturn;
}
Pod buildPod(String podName) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
.withNewSpec()
.addNewVolume()
.withName("gradlecache")
.withNewHostPath()
.withPath("/tmp/gradle")
.withType("DirectoryOrCreate")
.endHostPath()
.endVolume()
.addNewContainer()
.withImage(dockerTag)
.withCommand("bash")
.withArgs("-c", "sleep 3600")
.addNewEnv()
.withName("DRIVER_NODE_MEMORY")
.withValue("1024m")
.withName("DRIVER_WEB_MEMORY")
.withValue("1024m")
.endEnv()
.withName(podName)
.withNewResources()
.addToRequests("cpu", new Quantity(numberOfCoresPerFork.toString()))
.addToRequests("memory", new Quantity(memoryGbPerFork.toString() + "Gi"))
.endResources()
.addNewVolumeMount()
.withName("gradlecache")
.withMountPath("/tmp/gradle")
.endVolumeMount()
.endContainer()
.withImagePullSecrets(new LocalObjectReference("regcred"))
.withRestartPolicy("Never")
.endSpec()
.build();
}
void startLogPumping(File outputFile, InputStream stdOutIs, Integer podIdx, boolean printOutput) {
Thread loggingThread = new Thread(() -> {
try (BufferedWriter out = new BufferedWriter(new FileWriter(outputFile));
BufferedReader br = new BufferedReader(new InputStreamReader(stdOutIs))) {
String line;
while ((line = br.readLine()) != null) {
String toWrite = ("Container" + podIdx + ": " + line).trim();
if (printOutput) {
getProject().getLogger().lifecycle(toWrite);
}
out.write(line);
out.newLine();
}
} catch (IOException ignored) {
}
});
loggingThread.setDaemon(true);
loggingThread.start();
}
Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) {
return client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
@Override
public void eventReceived(Watcher.Action action, Pod resource) {
getProject().getLogger().lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name() + " (" + resource.getStatus().getPhase() + ")");
}
@Override
public void onClose(KubernetesClientException cause) {
}
});
}
void waitForPodToStart(String podName, KubernetesClient client, String namespace) {
getProject().getLogger().lifecycle("Waiting for pod " + podName + " to start before executing build");
try {
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
getProject().getLogger().lifecycle("pod " + podName + " has started, executing build");
}
Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
String resultsInContainerPath = "/tmp/source/build/test-reports";
String binaryResultsFile = "results.bin";
String podName = cp.getMetadata().getName();
Path tempDir = new File(new File(getProject().getBuildDir(), "test-results-xml"), podName).toPath();
if (!tempDir.toFile().exists()) {
tempDir.toFile().mkdirs();
}
getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath());
client.pods()
.inNamespace(namespace)
.withName(podName)
.dir(resultsInContainerPath)
.copy(tempDir);
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile);
}
String[] getBuildCommand(int numberOfPods, int podIdx) {
String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " +
"let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" +
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" +
"let rs=$? ; sleep 10 ; exit ${rs}";
return new String[]{"bash", "-c", shellScript};
}
List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start));
List<File> folders = new ArrayList<>();
while (!filesToInspect.isEmpty()) {
File fileToInspect = filesToInspect.poll();
if (fileToInspect.getAbsolutePath().endsWith(fileNameToFind)) {
folders.add(fileToInspect.getParentFile());
}
if (fileToInspect.isDirectory()) {
filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList()));
}
}
return folders;
}
void schedulePodForDeleteOnShutdown(String podName, KubernetesClient client, Pod createdPod) {
getProject().getLogger().info("attaching shutdown hook for pod " + podName);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Deleting pod: " + podName);
client.pods().delete(createdPod);
}));
}
ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) {
return new ExecListener() {
final Long start = System.currentTimeMillis();
@Override
public void onOpen(Response response) {
getProject().getLogger().lifecycle("Build started on pod " + podName);
}
@Override
public void onFailure(Throwable t, Response response) {
getProject().getLogger().lifecycle("Received error from rom pod " + podName);
waitingFuture.completeExceptionally(t);
}
@Override
public void onClose(int code, String reason) {
getProject().getLogger().lifecycle("Received onClose() from pod " + podName + " , build took: " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
try {
String errChannelContents = errChannelStream.toString();
Status status = Serialization.unmarshal(errChannelContents, Status.class);
Integer resultCode = Optional.ofNullable(status).map(Status::getDetails)
.map(StatusDetails::getCauses)
.flatMap(c -> c.stream().findFirst())
.map(StatusCause::getMessage)
.map(Integer::parseInt).orElse(0);
result.setResultCode(resultCode);
waitingFuture.complete(result);
} catch (Exception e) {
waitingFuture.completeExceptionally(e);
}
}
};
}
}

View File

@ -79,7 +79,7 @@ class ListTests extends DefaultTask implements TestLister {
.collect { c -> (c.getSubclasses() + Collections.singletonList(c)) }
.flatten()
.collect { ClassInfo c ->
c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name + "*" }
c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name }
}.flatten()
.toSet()
@ -97,7 +97,7 @@ class ListTests extends DefaultTask implements TestLister {
.getClassesWithMethodAnnotation("org.junit.Test")
.collect { c -> (c.getSubclasses() + Collections.singletonList(c)) }
.flatten()
.collect { ClassInfo c -> c.name + "*" }.flatten()
.collect { ClassInfo c -> c.name }.flatten()
.toSet()
this.allTests = results.stream().sorted().collect(Collectors.toList())
break

View File

@ -5,19 +5,16 @@ import io.fabric8.kubernetes.api.model.Pod;
import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
public class KubePodResult {
private final Pod createdPod;
private final CompletableFuture<Void> waiter;
private volatile Integer resultCode = 255;
private final File output;
private volatile Collection<File> binaryResults = Collections.emptyList();
KubePodResult(Pod createdPod, CompletableFuture<Void> waiter, File output) {
KubePodResult(Pod createdPod, File output) {
this.createdPod = createdPod;
this.waiter = waiter;
this.output = output;
}
@ -34,8 +31,22 @@ public class KubePodResult {
}
public File getOutput() {
return output;
}
public Pod getCreatedPod() {
return createdPod;
}
public Collection<File> getBinaryResults() {
synchronized (createdPod) {
return output;
return binaryResults;
}
}
public void setBinaryResults(Collection<File> binaryResults) {
synchronized (createdPod) {
this.binaryResults = binaryResults;
}
}
};

View File

@ -21,12 +21,12 @@ public class BucketingAllocatorTest {
BucketingAllocator bucketingAllocator = new BucketingAllocator(1, Collections::emptyList);
Object task = new Object();
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task);
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task);
bucketingAllocator.generateTestPlan();
List<String> testsForForkAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task);
Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*"));
Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass"));
}
@ -37,7 +37,7 @@ public class BucketingAllocatorTest {
BucketingAllocator bucketingAllocator = new BucketingAllocator(2, Collections::emptyList);
Object task = new Object();
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task);
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task);
bucketingAllocator.generateTestPlan();
List<String> testsForForkOneAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task);
@ -48,7 +48,7 @@ public class BucketingAllocatorTest {
List<String> allTests = Stream.of(testsForForkOneAndTestTask, testsForForkTwoAndTestTask).flatMap(Collection::stream).collect(Collectors.toList());
Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*"));
Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass"));
}
}

View File

@ -10,7 +10,6 @@ import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@Ignore
class NodeRPCTests {
private val CORDA_VERSION_REGEX = "\\d+(\\.\\d+)?(-\\w+)?".toRegex()
private val CORDA_VENDOR = "Corda Open Source"
@ -28,7 +27,6 @@ class NodeRPCTests {
driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = CORDAPPS, extraCordappPackagesToScan = emptyList())) {
val nodeDiagnosticInfo = startNode().get().rpc.nodeDiagnosticInfo()
assertTrue(nodeDiagnosticInfo.version.matches(CORDA_VERSION_REGEX))
assertTrue(nodeDiagnosticInfo.revision.matches(HEXADECIMAL_REGEX))
assertEquals(PLATFORM_VERSION, nodeDiagnosticInfo.platformVersion)
assertEquals(CORDA_VENDOR, nodeDiagnosticInfo.vendor)
nodeDiagnosticInfo.cordapps.forEach { println("${it.shortName} ${it.type}") }

View File

@ -3,8 +3,17 @@ ENV GRADLE_USER_HOME=/tmp/gradle
RUN mkdir /tmp/gradle && mkdir -p /home/root/.m2/repository
RUN apt-get update && apt-get install -y curl libatomic1 && \
curl -O https://d3pxv6yz143wms.cloudfront.net/8.222.10.1/java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \
apt-get install -y java-common && dpkg -i java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \
curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
apt-get install -y java-common && apt install -y ./zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
apt-get clean && \
mkdir -p /tmp/source
rm -f zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
mv /zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz /usr/lib/jvm/ && \
cd /usr/lib/jvm/ && \
tar -zxvf zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
rm -rf zulu-8-amd64 && \
mv zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64 zulu-8-amd64 && \
rm -f zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
cd / && mkdir -p /tmp/source