TM-40 Ephemeral workspace for k8s workers that survives restarts (#5567)

* Simplify

* Mount shared dir to worker

* format

* podnames with separators

* refactor parameters

* Use PVC for storage

* pvc in namespace

* KubesTest simplify

* no tolowercase

* no private

* lowercase

* RetryStrategy

* minor changes

* wait forever

* undo .idea

* elvis

* add comment

* regcred

* use correct ConfigBuilder

* delete java, will migrate later

* Revert "delete java, will migrate later"

This reverts commit e3bab1f3

* Merging changes in groovy to new java file

* format

* rename variable

* fix log

* private

* remove bak

* move java files

* Revert "move java files"

This reverts commit 89aa4c35
This commit is contained in:
Zoltan Kiss 2019-10-15 10:52:44 +01:00 committed by Stefano Franz
parent 858ec29953
commit a1dd6abe17
5 changed files with 216 additions and 141 deletions

View File

@ -3,6 +3,10 @@
<option name="LINE_SEPARATOR" value="&#10;" />
<option name="RIGHT_MARGIN" value="140" />
<option name="SOFT_MARGINS" value="140" />
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />

View File

@ -3,8 +3,9 @@ package net.corda.testing;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.*;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import net.corda.testing.retry.Retry;
import okhttp3.Response;
import org.gradle.api.DefaultTask;
import org.gradle.api.tasks.TaskAction;
@ -21,8 +22,11 @@ import;
public class KubesTest extends DefaultTask {
static final ExecutorService executorService = Executors.newCachedThreadPool();
static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor();
private static final ExecutorService executorService = Executors.newCachedThreadPool();
* Name of the k8s Secret object that holds the credentials to access the docker image registry
private static final String REGISTRY_CREDENTIALS_SECRET_NAME = "regcred";
String dockerTag;
String fullTaskToExecutePath;
@ -46,12 +50,11 @@ public class KubesTest extends DefaultTask {
String buildId = System.getProperty("buildId", "0");
String currentUser = System.getProperty("", "UNKNOWN_USER");
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase();
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase();
String stableRunId = rnd64Base36(new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode()));
String random = rnd64Base36(new Random());
final KubernetesClient client = getKubernetesClient();
try {
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
@ -63,10 +66,9 @@ public class KubesTest extends DefaultTask {
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
String potentialPodName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase();
String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62));
return runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
List<Future<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
String podName = taskToExecuteName.toLowerCase() + "-" + stableRunId + "-" + random + "-" + i;
return submitBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
this.testOutput = Collections.synchronizedList( -> {
@ -86,7 +88,7 @@ public class KubesTest extends DefaultTask {
KubernetesClient getKubernetesClient() {
private KubernetesClient getKubernetesClient() {
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
@ -98,85 +100,137 @@ public class KubesTest extends DefaultTask {
return new DefaultKubernetesClient(config);
CompletableFuture<KubePodResult> runBuild(KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries) {
CompletableFuture<KubePodResult> toReturn = new CompletableFuture<>();
executorService.submit(() -> {
int tryCount = 0;
Pod createdPod = null;
while (tryCount < numberOfRetries) {
try {
Pod podRequest = buildPod(podName);
getProject().getLogger().lifecycle("requesting pod: " + podName);
createdPod = client.pods().inNamespace(namespace).create(podRequest);
getProject().getLogger().lifecycle("scheduled pod: " + podName);
File outputFile = Files.createTempFile("container", ".log").toFile();
attachStatusListenerToPod(client, namespace, podName);
schedulePodForDeleteOnShutdown(podName, client, createdPod);
waitForPodToStart(podName, client, namespace);
PipedOutputStream stdOutOs = new PipedOutputStream();
PipedInputStream stdOutIs = new PipedInputStream(4096);
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
KubePodResult result = new KubePodResult(createdPod, outputFile);
CompletableFuture<KubePodResult> waiter = new CompletableFuture<>();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result);
String[] buildCommand = getBuildCommand(numberOfPods, podIdx);
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
startLogPumping(outputFile, stdOutIs, podIdx, printOutput);
KubePodResult execResult = waiter.join();
getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + ")");
getLogger().lifecycle("Gathering test results from " + execResult.getCreatedPod().getMetadata().getName());
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, execResult.getCreatedPod());
getLogger().lifecycle("deleting: " + execResult.getCreatedPod().getMetadata().getName());
} catch (Exception e) {
getLogger().error("Encountered error during testing cycle on pod " + podName + " (" + podIdx / numberOfPods + ")", e);
try {
if (createdPod != null) {
while (client.pods().inNamespace(namespace).list().getItems().stream().anyMatch(p -> Objects.equals(p.getMetadata().getName(), podName))) {
getLogger().warn("pod " + podName + " has not been deleted, waiting 1s");
} catch (Exception ignored) {
getLogger().lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times");
if (tryCount >= numberOfRetries) {
toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit"));
return toReturn;
private static String rnd64Base36(Random rnd) {
return new BigInteger(64, rnd)
private Future<KubePodResult> submitBuild(
KubernetesClient client,
String namespace,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries
) {
return executorService.submit(new Callable<KubePodResult>() {
public KubePodResult call() throws Exception {
PersistentVolumeClaim pvc = createPvc(client, podName);
return buildRunPodWithRetriesOrThrow(client, namespace, pvc, numberOfPods, podIdx, podName, printOutput, numberOfRetries);
private static void addShutdownHook(Runnable hook) {
Runtime.getRuntime().addShutdownHook(new Thread(hook));
private PersistentVolumeClaim createPvc(KubernetesClient client, String name) {
PersistentVolumeClaim pvc = client.persistentVolumeClaims()
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
addShutdownHook(() -> {
System.out.println("Deleing PVC: " + pvc.getMetadata().getName());
return pvc;
private KubePodResult buildRunPodWithRetriesOrThrow(
KubernetesClient client,
String namespace,
PersistentVolumeClaim pvc,
int numberOfPods,
int podIdx,
String podName,
boolean printOutput,
int numberOfRetries
) {
addShutdownHook(() -> {
System.out.println("deleting pod: " + podName);
try {
// pods might die, so we retry
return Retry.fixed(numberOfRetries).run(() -> {
// remove pod if exists
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
if (oldPod.get() != null) {
getLogger().lifecycle("deleting pod: {}", podName);
while (oldPod.get() != null) {
getLogger().info("waiting for pod {} to be removed", podName);
// recreate and run
getProject().getLogger().lifecycle("creating pod: " + podName);
Pod createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName, pvc));
getProject().getLogger().lifecycle("scheduled pod: " + podName);
attachStatusListenerToPod(client, createdPod);
waitForPodToStart(client, createdPod);
PipedOutputStream stdOutOs = new PipedOutputStream();
PipedInputStream stdOutIs = new PipedInputStream(4096);
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
CompletableFuture<Integer> waiter = new CompletableFuture<>();
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter);
.exec(getBuildCommand(numberOfPods, podIdx));
File podOutput = startLogPumping(stdOutIs, podIdx, printOutput);
int resCode = waiter.join();
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + "), gathering results");
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, createdPod);
return new KubePodResult(resCode, podOutput, binaryResults);
} catch (Retry.RetryException e) {
throw new RuntimeException("Failed to build in pod " + podName + " (" + podIdx + "/" + numberOfPods + ") in " + numberOfRetries + " attempts", e);
private Pod buildPodRequest(String podName, PersistentVolumeClaim pvc) {
return new PodBuilder()
Pod buildPod(String podName) {
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
@ -192,18 +246,19 @@ public class KubesTest extends DefaultTask {
.addToRequests("cpu", new Quantity(numberOfCoresPerFork.toString()))
.addToRequests("memory", new Quantity(memoryGbPerFork.toString() + "Gi"))
.withImagePullSecrets(new LocalObjectReference("regcred"))
void startLogPumping(File outputFile, InputStream stdOutIs, Integer podIdx, boolean printOutput) {
private File startLogPumping(InputStream stdOutIs, int podIdx, boolean printOutput) throws IOException {
File outputFile = Files.createTempFile("container", ".log").toFile();
Thread loggingThread = new Thread(() -> {
try (BufferedWriter out = new BufferedWriter(new FileWriter(outputFile));
BufferedReader br = new BufferedReader(new InputStreamReader(stdOutIs))) {
@ -222,10 +277,11 @@ public class KubesTest extends DefaultTask {
return outputFile;
Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) {
return client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
private Watch attachStatusListenerToPod(KubernetesClient client, Pod pod) {
return client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
public void eventReceived(Watcher.Action action, Pod resource) {
getProject().getLogger().lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + + " (" + resource.getStatus().getPhase() + ")");
@ -237,17 +293,17 @@ public class KubesTest extends DefaultTask {
void waitForPodToStart(String podName, KubernetesClient client, String namespace) {
getProject().getLogger().lifecycle("Waiting for pod " + podName + " to start before executing build");
private void waitForPodToStart(KubernetesClient client, Pod pod) {
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
try {
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
getProject().getLogger().lifecycle("pod " + podName + " has started, executing build");
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
private Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
String resultsInContainerPath = "/tmp/source/build/test-reports";
String binaryResultsFile = "results.bin";
String podName = cp.getMetadata().getName();
@ -267,7 +323,7 @@ public class KubesTest extends DefaultTask {
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile);
String[] getBuildCommand(int numberOfPods, int podIdx) {
private String[] getBuildCommand(int numberOfPods, int podIdx) {
String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " +
"let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" +
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" +
@ -275,7 +331,7 @@ public class KubesTest extends DefaultTask {
return new String[]{"bash", "-c", shellScript};
List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
private List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start));
List<File> folders = new ArrayList<>();
while (!filesToInspect.isEmpty()) {
@ -291,15 +347,7 @@ public class KubesTest extends DefaultTask {
return folders;
void schedulePodForDeleteOnShutdown(String podName, KubernetesClient client, Pod createdPod) {
getProject().getLogger().info("attaching shutdown hook for pod " + podName);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Deleting pod: " + podName);
ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) {
private ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<Integer> waitingFuture) {
return new ExecListener() {
final Long start = System.currentTimeMillis();
@ -326,8 +374,7 @@ public class KubesTest extends DefaultTask {
.flatMap(c ->
} catch (Exception e) {
@ -335,5 +382,4 @@ public class KubesTest extends DefaultTask {

View File

@ -1,52 +1,29 @@
package net.corda.testing;
import io.fabric8.kubernetes.api.model.Pod;
import java.util.Collection;
import java.util.Collections;
public class KubePodResult {
private final Pod createdPod;
private volatile Integer resultCode = 255;
private final int resultCode;
private final File output;
private volatile Collection<File> binaryResults = Collections.emptyList();
private final Collection<File> binaryResults;
KubePodResult(Pod createdPod, File output) {
this.createdPod = createdPod;
public KubePodResult(int resultCode, File output, Collection<File> binaryResults) {
this.resultCode = resultCode;
this.output = output;
this.binaryResults = binaryResults;
public void setResultCode(Integer code) {
synchronized (createdPod) {
this.resultCode = code;
public Integer getResultCode() {
synchronized (createdPod) {
return this.resultCode;
public int getResultCode() {
return resultCode;
public File getOutput() {
return output;
public Pod getCreatedPod() {
return createdPod;
public Collection<File> getBinaryResults() {
synchronized (createdPod) {
return binaryResults;
return binaryResults;
public void setBinaryResults(Collection<File> binaryResults) {
synchronized (createdPod) {
this.binaryResults = binaryResults;

View File

@ -152,10 +152,10 @@ public class KubesReporting extends DefaultTask {
if (!containersWithNonZeroReturnCodes.isEmpty()) {
String reportUrl = new ConsoleRenderer().asClickableFileUrl(new File(destinationDir, "index.html"));
if (shouldPrintOutput){
containersWithNonZeroReturnCodes.forEach(container -> {
containersWithNonZeroReturnCodes.forEach(podResult -> {
try {
System.out.println("\n##### CONTAINER OUTPUT START #####");
IOUtils.copy(new FileInputStream(container.getOutput()), System.out);
IOUtils.copy(new FileInputStream(podResult.getOutput()), System.out);
System.out.println("##### CONTAINER OUTPUT END #####\n");
} catch (IOException ignored) {

View File

@ -0,0 +1,48 @@
package net.corda.testing.retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
public final class Retry {
private static final Logger log = LoggerFactory.getLogger(Retry.class);
public interface RetryStrategy {
<T> T run(Callable<T> op) throws RetryException;
public static final class RetryException extends RuntimeException {
public RetryException(String message) {
public RetryException(String message, Throwable cause) {
super(message, cause);
public static RetryStrategy fixed(int times) {
if (times < 1) throw new IllegalArgumentException();
return new RetryStrategy() {
public <T> T run(Callable<T> op) {
int run = 0;
Exception last = null;
while (run < times) {
try {
} catch (Exception e) {
last = e;"Exception caught: " + e.getMessage());
throw new RetryException("Operation failed " + run + " times", last);